You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/23 16:06:08 UTC

[3/4] flink git commit: [FLINK-1982] [record-api] Remove dependencies on Record API from flink-runtime tests

http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
deleted file mode 100644
index 6c4659d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ /dev/null
@@ -1,1019 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
-import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("deprecation")
-public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
-	
-	private static final long HASH_MEM = 6*1024*1024;
-	
-	private static final long SORT_MEM = 3*1024*1024;
-
-	private static final int NUM_SORTER = 2;
-	
-	private static final long BNLJN_MEM = 10 * PAGE_SIZE;
-
-	private final double bnljn_frac;
-
-	private final double hash_frac;
-	
-	@SuppressWarnings("unchecked")
-	private final RecordComparator comparator1 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
-	
-	@SuppressWarnings("unchecked")
-	private final RecordComparator comparator2 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
-	
-	private final List<Record> outList = new ArrayList<Record>();
-	
-	
-	public MatchTaskTest(ExecutionConfig config) {
-		super(config, HASH_MEM, NUM_SORTER, SORT_MEM);
-		bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
-		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
-	}
-	
-	
-	@Test
-	public void testSortBoth1MatchTask() {
-		final int keyCnt1 = 20;
-		final int valCnt1 = 1;
-		
-		final int keyCnt2 = 10;
-		final int valCnt2 = 2;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-	}
-	
-	@Test
-	public void testSortBoth2MatchTask() {
-
-		int keyCnt1 = 20;
-		int valCnt1 = 1;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 1;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testSortBoth3MatchTask() {
-
-		int keyCnt1 = 20;
-		int valCnt1 = 1;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testSortBoth4MatchTask() {
-
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 1;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testSortBoth5MatchTask() {
-
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testSortFirstMatchTask() {
-
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testSortSecondMatchTask() {
-
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testMergeMatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testFailingMatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		setOutput(new NirvanaOutputList());
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
-		
-		try {
-			testDriver(testTask, MockFailingMatchStub.class);
-			Assert.fail("Driver did not forward Exception.");
-		} catch (ExpectedTestException e) {
-			// good!
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-	}
-	
-	@Test
-	public void testCancelMatchTaskWhileSort1() {
-		final int keyCnt = 20;
-		final int valCnt = 20;
-		
-		try {
-			setOutput(new NirvanaOutputList());
-			addDriverComparator(this.comparator1);
-			addDriverComparator(this.comparator2);
-			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-			getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-			setNumFileHandlesForSort(4);
-			
-			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-			
-			try {
-				addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
-				addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-			} catch (Exception e) {
-				e.printStackTrace();
-				Assert.fail("The test caused an exception.");
-			}
-	
-			final AtomicReference<Throwable> error = new AtomicReference<>();
-
-			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort1()") {
-				@Override
-				public void run() {
-					try {
-						testDriver(testTask, MockMatchStub.class);
-					}
-					catch (Throwable t) {
-						error.set(t);
-					}
-				}
-			};
-			taskRunner.start();
-
-			Thread.sleep(1000);
-
-			cancel();
-			taskRunner.interrupt();
-
-			taskRunner.join(60000);
-
-			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
-
-			Throwable taskError = error.get();
-			if (taskError != null) {
-				taskError.printStackTrace();
-				fail("Error in task while canceling: " + taskError.getMessage());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testCancelMatchTaskWhileSort2() {
-		final int keyCnt = 20;
-		final int valCnt = 20;
-		
-		try {
-			setOutput(new NirvanaOutputList());
-			addDriverComparator(this.comparator1);
-			addDriverComparator(this.comparator2);
-			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-			getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-			setNumFileHandlesForSort(4);
-			
-			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-			
-			try {
-				addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-				addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
-			} catch (Exception e) {
-				e.printStackTrace();
-				Assert.fail("The test caused an exception.");
-			}
-
-			final AtomicReference<Throwable> error = new AtomicReference<>();
-
-			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort2()") {
-				@Override
-				public void run() {
-					try {
-						testDriver(testTask, MockMatchStub.class);
-					}
-					catch (Throwable t) {
-						error.set(t);
-					}
-				}
-			};
-			taskRunner.start();
-
-			Thread.sleep(1000);
-
-			cancel();
-			taskRunner.interrupt();
-
-			taskRunner.join(60000);
-
-			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
-
-			Throwable taskError = error.get();
-			if (taskError != null) {
-				taskError.printStackTrace();
-				fail("Error in task while canceling: " + taskError.getMessage());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testCancelMatchTaskWhileMatching() {
-		final int keyCnt = 20;
-		final int valCnt = 20;
-		
-		try {
-			setOutput(new NirvanaOutputList());
-			addDriverComparator(this.comparator1);
-			addDriverComparator(this.comparator2);
-			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-			getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-			setNumFileHandlesForSort(4);
-			
-			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-			
-			addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-			addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-			
-			final AtomicReference<Throwable> error = new AtomicReference<>();
-			
-			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileMatching()") {
-				@Override
-				public void run() {
-					try {
-						testDriver(testTask, MockDelayingMatchStub.class);
-					}
-					catch (Throwable t) {
-						error.set(t);
-					}
-				}
-			};
-			taskRunner.start();
-			
-			Thread.sleep(1000);
-			
-			cancel();
-			taskRunner.interrupt();
-			
-			taskRunner.join(60000);
-			
-			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
-			
-			Throwable taskError = error.get();
-			if (taskError != null) {
-				taskError.printStackTrace();
-				fail("Error in task while canceling: " + taskError.getMessage());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testHash1MatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 1;
-		
-		int keyCnt2 = 10;
-		int valCnt2 = 2;
-				
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.outList);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
-		this.outList.clear();
-	}
-	
-	@Test
-	public void testHash2MatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 1;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 1;
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.outList);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
-		this.outList.clear();
-	}
-	
-	@Test
-	public void testHash3MatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 1;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.outList);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
-		this.outList.clear();
-	}
-	
-	@Test
-	public void testHash4MatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 1;
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.outList);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
-		this.outList.clear();
-	}
-	
-	@Test
-	public void testHash5MatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.outList);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
-		this.outList.clear();
-	}
-	
-	@Test
-	public void testFailingHashFirstMatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(new NirvanaOutputList());
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockFailingMatchStub.class);
-			Assert.fail("Function exception was not forwarded.");
-		} catch (ExpectedTestException etex) {
-			// good!
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-	}
-	
-	@Test
-	public void testFailingHashSecondMatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(new NirvanaOutputList());
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockFailingMatchStub.class);
-			Assert.fail("Function exception was not forwarded.");
-		} catch (ExpectedTestException etex) {
-			// good!
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-	}
-	
-	@Test
-	public void testCancelHashMatchTaskWhileBuildFirst() {
-		final int keyCnt = 20;
-		final int valCnt = 20;
-
-		try {
-			addInput(new DelayingInfinitiveInputIterator(100));
-			addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-
-			addDriverComparator(this.comparator1);
-			addDriverComparator(this.comparator2);
-
-			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-
-			setOutput(new NirvanaOutputList());
-
-			getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-			getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
-			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
-			final AtomicBoolean success = new AtomicBoolean(false);
-
-			Thread taskRunner = new Thread() {
-				@Override
-				public void run() {
-					try {
-						testDriver(testTask, MockMatchStub.class);
-						success.set(true);
-					} catch (Exception ie) {
-						ie.printStackTrace();
-					}
-				}
-			};
-			taskRunner.start();
-
-			Thread.sleep(1000);
-			cancel();
-
-			try {
-				taskRunner.join();
-			}
-			catch (InterruptedException ie) {
-				Assert.fail("Joining threads failed");
-			}
-
-			Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testHashCancelMatchTaskWhileBuildSecond() {
-		final int keyCnt = 20;
-		final int valCnt = 20;
-
-		try {
-			addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-			addInput(new DelayingInfinitiveInputIterator(100));
-
-			addDriverComparator(this.comparator1);
-			addDriverComparator(this.comparator2);
-
-			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-
-			setOutput(new NirvanaOutputList());
-
-			getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-			getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
-			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
-			final AtomicBoolean success = new AtomicBoolean(false);
-
-			Thread taskRunner = new Thread() {
-				@Override
-				public void run() {
-					try {
-						testDriver(testTask, MockMatchStub.class);
-						success.set(true);
-					} catch (Exception ie) {
-						ie.printStackTrace();
-					}
-				}
-			};
-			taskRunner.start();
-
-			Thread.sleep(1000);
-			cancel();
-
-			try {
-				taskRunner.join();
-			}
-			catch (InterruptedException ie) {
-				Assert.fail("Joining threads failed");
-			}
-
-			Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testHashFirstCancelMatchTaskWhileMatching() {
-		int keyCnt = 20;
-		int valCnt = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(new NirvanaOutputList());
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		final AtomicBoolean success = new AtomicBoolean(false);
-		
-		Thread taskRunner = new Thread() {
-			@Override
-			public void run() {
-				try {
-					testDriver(testTask, MockMatchStub.class);
-					success.set(true);
-				} catch (Exception ie) {
-					ie.printStackTrace();
-				}
-			}
-		};
-		taskRunner.start();
-		
-		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
-		tct.start();
-		
-		try {
-			tct.join();
-			taskRunner.join();
-		} catch(InterruptedException ie) {
-			Assert.fail("Joining threads failed");
-		}
-		
-		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
-	}
-	
-	@Test
-	public void testHashSecondCancelMatchTaskWhileMatching() {
-		int keyCnt = 20;
-		int valCnt = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(new NirvanaOutputList());
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		final AtomicBoolean success = new AtomicBoolean(false);
-		
-		Thread taskRunner = new Thread() {
-			@Override
-			public void run() {
-				try {
-					testDriver(testTask, MockMatchStub.class);
-					success.set(true);
-				} catch (Exception ie) {
-					ie.printStackTrace();
-				}
-			}
-		};
-		taskRunner.start();
-		
-		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
-		tct.start();
-		
-		try {
-			tct.join();
-			taskRunner.join();
-		} catch(InterruptedException ie) {
-			Assert.fail("Joining threads failed");
-		}
-		
-		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
-	}
-	
-	// =================================================================================================
-	
-	public static final class MockMatchStub extends JoinFunction {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
-			out.collect(record1);
-		}
-	}
-	
-	public static final class MockFailingMatchStub extends JoinFunction {
-		private static final long serialVersionUID = 1L;
-		
-		private int cnt = 0;
-		
-		@Override
-		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
-			if (++this.cnt >= 10) {
-				throw new ExpectedTestException();
-			}
-			out.collect(record1);
-		}
-	}
-	
-	public static final class MockDelayingMatchStub extends JoinFunction {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
-			try {
-				Thread.sleep(100);
-			} catch (InterruptedException e) {
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index f59c4a3..415b6bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -26,9 +26,9 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -46,7 +46,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 	private final RecordComparator comparator = new RecordComparator(
 		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
 	
-	private final List<Record> outList = new ArrayList<Record>();
+	private final List<Record> outList = new ArrayList<>();
 	
 	
 	public ReduceTaskExternalITCase(ExecutionConfig config) {
@@ -68,7 +68,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		try {
 			addInputSorted(new UniformRecordGenerator(keyCnt, valCnt, false), this.comparator.duplicate());
 			
-			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 			
 			testDriver(testTask, MockReduceStub.class);
 		} catch (Exception e) {
@@ -100,7 +100,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		try {
 			addInputSorted(new UniformRecordGenerator(keyCnt, valCnt, false), this.comparator.duplicate());
 			
-			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 			
 			testDriver(testTask, MockReduceStub.class);
 		} catch (Exception e) {
@@ -130,14 +130,14 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		
 		CombiningUnilateralSortMerger<Record> sorter = null;
 		try {
-			sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), 
+			sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(),
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), 
 				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
 					this.perSortFractionMem,
 					2, 0.8f, true);
 			addInput(sorter.getIterator());
 			
-			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 			testDriver(testTask, MockCombiningReduceStub.class);
 		} catch (Exception e) {
@@ -176,14 +176,14 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		
 		CombiningUnilateralSortMerger<Record> sorter = null;
 		try {
-			sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), 
+			sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(),
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), 
 				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
 					this.perSortFractionMem,
 					2, 0.8f, false);
 			addInput(sorter.getIterator());
 			
-			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 			testDriver(testTask, MockCombiningReduceStub.class);
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index cc25c99..8bc7fe5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -27,9 +27,9 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
@@ -51,7 +52,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 	private final RecordComparator comparator = new RecordComparator(
 		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
 	
-	private final List<Record> outList = new ArrayList<Record>();
+	private final List<Record> outList = new ArrayList<>();
 
 	public ReduceTaskTest(ExecutionConfig config) {
 		super(config, 0, 1, 3*1024*1024);
@@ -69,7 +70,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		try {
 			addInputSorted(new UniformRecordGenerator(keyCnt, valCnt, false), this.comparator.duplicate());
 			
-			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 			
 			testDriver(testTask, MockReduceStub.class);
 		} catch (Exception e) {
@@ -96,7 +97,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
-		GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+		GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 		try {
 			testDriver(testTask, MockReduceStub.class);
@@ -125,13 +126,13 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		
 		CombiningUnilateralSortMerger<Record> sorter = null;
 		try {
-			sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), 
+			sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(),
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), 
 				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem,
 					4, 0.8f, true);
 			addInput(sorter.getIterator());
 			
-			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 			testDriver(testTask, MockCombiningReduceStub.class);
 		} catch (Exception e) {
@@ -168,7 +169,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
-		GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+		GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 		try {
 			testDriver(testTask, MockFailingReduceStub.class);
@@ -190,7 +191,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
-		final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+		final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 		try {
 			addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator.duplicate());
@@ -238,7 +239,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
-		final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+		final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 1f19699..542812c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -21,17 +21,17 @@ package org.apache.flink.runtime.operators.chaining;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.GenericCollectorMap;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.BatchTask;
-import org.apache.flink.runtime.operators.MapTaskTest.MockMapStub;
+import org.apache.flink.runtime.operators.FlatMapDriver;
+import org.apache.flink.runtime.operators.FlatMapTaskTest.MockMapStub;
 import org.apache.flink.runtime.operators.ReduceTaskTest.MockReduceStub;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -49,14 +49,13 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({Task.class, ResultPartitionWriter.class})
-@SuppressWarnings("deprecation")
 public class ChainTaskTest extends TaskTestBase {
 	
 	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
 
 	private static final int NETWORK_BUFFER_SIZE = 1024;
 	
-	private final List<Record> outList = new ArrayList<Record>();
+	private final List<Record> outList = new ArrayList<>();
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparatorFactory compFact = new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}, new boolean[] {true});
@@ -95,16 +94,16 @@ public class ChainTaskTest extends TaskTestBase {
 				combineConfig.setRelativeMemoryDriver(memoryFraction);
 				
 				// udf
-				combineConfig.setStubWrapper(new UserCodeClassWrapper<MockReduceStub>(MockReduceStub.class));
+				combineConfig.setStubWrapper(new UserCodeClassWrapper<>(MockReduceStub.class));
 				
 				getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, combineConfig, "combine");
 			}
 			
 			// chained map+combine
 			{
-				BatchTask<GenericCollectorMap<Record, Record>, Record> testTask =
-											new BatchTask<GenericCollectorMap<Record, Record>, Record>();
-				registerTask(testTask, CollectorMapDriver.class, MockMapStub.class);
+				BatchTask<FlatMapFunction<Record, Record>, Record> testTask =
+											new BatchTask<>();
+				registerTask(testTask, FlatMapDriver.class, MockMapStub.class);
 				
 				try {
 					testTask.invoke();
@@ -156,17 +155,17 @@ public class ChainTaskTest extends TaskTestBase {
 				combineConfig.setRelativeMemoryDriver(memoryFraction);
 				
 				// udf
-				combineConfig.setStubWrapper(new UserCodeClassWrapper<MockFailingCombineStub>(MockFailingCombineStub.class));
+				combineConfig.setStubWrapper(new UserCodeClassWrapper<>(MockFailingCombineStub.class));
 				
 				getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, combineConfig, "combine");
 			}
 			
 			// chained map+combine
 			{
-				final BatchTask<GenericCollectorMap<Record, Record>, Record> testTask =
-											new BatchTask<GenericCollectorMap<Record, Record>, Record>();
+				final BatchTask<FlatMapFunction<Record, Record>, Record> testTask =
+											new BatchTask<>();
 				
-				super.registerTask(testTask, CollectorMapDriver.class, MockMapStub.class);
+				super.registerTask(testTask, FlatMapDriver.class, MockMapStub.class);
 	
 				boolean stubFailed = false;
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index 777bfc8..63f54ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.operators.testutils;
 
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
@@ -94,7 +94,7 @@ public abstract class TaskTestBase extends TestLogger {
 		
 		final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration());
 		config.setDriver(driver);
-		config.setStubWrapper(new UserCodeClassWrapper<RichFunction>(stubClass));
+		config.setStubWrapper(new UserCodeClassWrapper<>(stubClass));
 		
 		task.setEnvironment(this.mockEnv);
 
@@ -116,17 +116,17 @@ public abstract class TaskTestBase extends TestLogger {
 		}
 	}
 
-	public void registerFileOutputTask(AbstractInvokable outTask, Class<? extends FileOutputFormat> stubClass, String outPath) {
+	public void registerFileOutputTask(AbstractInvokable outTask, Class<? extends FileOutputFormat<Record>> stubClass, String outPath) {
 		registerFileOutputTask(outTask, InstantiationUtil.instantiate(stubClass, FileOutputFormat.class), outPath);
 	}
 	
-	public void registerFileOutputTask(AbstractInvokable outTask, FileOutputFormat outputFormat, String outPath) {
+	public void registerFileOutputTask(AbstractInvokable outTask, FileOutputFormat<Record> outputFormat, String outPath) {
 		TaskConfig dsConfig = new TaskConfig(this.mockEnv.getTaskConfiguration());
 		
 		outputFormat.setOutputFilePath(new Path(outPath));
 		outputFormat.setWriteMode(WriteMode.OVERWRITE);
 
-		dsConfig.setStubWrapper(new UserCodeObjectWrapper<FileOutputFormat>(outputFormat));
+		dsConfig.setStubWrapper(new UserCodeObjectWrapper<>(outputFormat));
 
 		outTask.setEnvironment(this.mockEnv);
 
@@ -139,9 +139,9 @@ public abstract class TaskTestBase extends TestLogger {
 	}
 
 	public void registerFileInputTask(AbstractInvokable inTask,
-			Class<? extends DelimitedInputFormat> stubClass, String inPath, String delimiter)
+			Class<? extends DelimitedInputFormat<Record>> stubClass, String inPath, String delimiter)
 	{
-		DelimitedInputFormat format;
+		DelimitedInputFormat<Record> format;
 		try {
 			format = stubClass.newInstance();
 		}
@@ -153,7 +153,7 @@ public abstract class TaskTestBase extends TestLogger {
 		format.setDelimiter(delimiter);
 		
 		TaskConfig dsConfig = new TaskConfig(this.mockEnv.getTaskConfiguration());
-		dsConfig.setStubWrapper(new UserCodeObjectWrapper<DelimitedInputFormat>(format));
+		dsConfig.setStubWrapper(new UserCodeObjectWrapper<>(format));
 		
 		this.inputSplitProvider.addInputSplits(inPath, 5);