You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/23 15:34:51 UTC

[1/5] flink git commit: [FLINK-1982] [record-api] Remove dependencies on Record API from flink-runtime tests

Repository: flink
Updated Branches:
  refs/heads/master 7ff071f66 -> bbb75c599


http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
deleted file mode 100644
index 6c4659d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ /dev/null
@@ -1,1019 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
-import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("deprecation")
-public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
-	
-	private static final long HASH_MEM = 6*1024*1024;
-	
-	private static final long SORT_MEM = 3*1024*1024;
-
-	private static final int NUM_SORTER = 2;
-	
-	private static final long BNLJN_MEM = 10 * PAGE_SIZE;
-
-	private final double bnljn_frac;
-
-	private final double hash_frac;
-	
-	@SuppressWarnings("unchecked")
-	private final RecordComparator comparator1 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
-	
-	@SuppressWarnings("unchecked")
-	private final RecordComparator comparator2 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
-	
-	private final List<Record> outList = new ArrayList<Record>();
-	
-	
-	public MatchTaskTest(ExecutionConfig config) {
-		super(config, HASH_MEM, NUM_SORTER, SORT_MEM);
-		bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
-		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
-	}
-	
-	
-	@Test
-	public void testSortBoth1MatchTask() {
-		final int keyCnt1 = 20;
-		final int valCnt1 = 1;
-		
-		final int keyCnt2 = 10;
-		final int valCnt2 = 2;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-	}
-	
-	@Test
-	public void testSortBoth2MatchTask() {
-
-		int keyCnt1 = 20;
-		int valCnt1 = 1;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 1;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testSortBoth3MatchTask() {
-
-		int keyCnt1 = 20;
-		int valCnt1 = 1;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testSortBoth4MatchTask() {
-
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 1;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testSortBoth5MatchTask() {
-
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testSortFirstMatchTask() {
-
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testSortSecondMatchTask() {
-
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testMergeMatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		setOutput(this.outList);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-		
-		this.outList.clear();
-		
-	}
-	
-	@Test
-	public void testFailingMatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		setOutput(new NirvanaOutputList());
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
-		
-		try {
-			testDriver(testTask, MockFailingMatchStub.class);
-			Assert.fail("Driver did not forward Exception.");
-		} catch (ExpectedTestException e) {
-			// good!
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-	}
-	
-	@Test
-	public void testCancelMatchTaskWhileSort1() {
-		final int keyCnt = 20;
-		final int valCnt = 20;
-		
-		try {
-			setOutput(new NirvanaOutputList());
-			addDriverComparator(this.comparator1);
-			addDriverComparator(this.comparator2);
-			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-			getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-			setNumFileHandlesForSort(4);
-			
-			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-			
-			try {
-				addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
-				addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-			} catch (Exception e) {
-				e.printStackTrace();
-				Assert.fail("The test caused an exception.");
-			}
-	
-			final AtomicReference<Throwable> error = new AtomicReference<>();
-
-			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort1()") {
-				@Override
-				public void run() {
-					try {
-						testDriver(testTask, MockMatchStub.class);
-					}
-					catch (Throwable t) {
-						error.set(t);
-					}
-				}
-			};
-			taskRunner.start();
-
-			Thread.sleep(1000);
-
-			cancel();
-			taskRunner.interrupt();
-
-			taskRunner.join(60000);
-
-			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
-
-			Throwable taskError = error.get();
-			if (taskError != null) {
-				taskError.printStackTrace();
-				fail("Error in task while canceling: " + taskError.getMessage());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testCancelMatchTaskWhileSort2() {
-		final int keyCnt = 20;
-		final int valCnt = 20;
-		
-		try {
-			setOutput(new NirvanaOutputList());
-			addDriverComparator(this.comparator1);
-			addDriverComparator(this.comparator2);
-			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-			getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-			setNumFileHandlesForSort(4);
-			
-			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-			
-			try {
-				addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-				addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
-			} catch (Exception e) {
-				e.printStackTrace();
-				Assert.fail("The test caused an exception.");
-			}
-
-			final AtomicReference<Throwable> error = new AtomicReference<>();
-
-			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort2()") {
-				@Override
-				public void run() {
-					try {
-						testDriver(testTask, MockMatchStub.class);
-					}
-					catch (Throwable t) {
-						error.set(t);
-					}
-				}
-			};
-			taskRunner.start();
-
-			Thread.sleep(1000);
-
-			cancel();
-			taskRunner.interrupt();
-
-			taskRunner.join(60000);
-
-			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
-
-			Throwable taskError = error.get();
-			if (taskError != null) {
-				taskError.printStackTrace();
-				fail("Error in task while canceling: " + taskError.getMessage());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testCancelMatchTaskWhileMatching() {
-		final int keyCnt = 20;
-		final int valCnt = 20;
-		
-		try {
-			setOutput(new NirvanaOutputList());
-			addDriverComparator(this.comparator1);
-			addDriverComparator(this.comparator2);
-			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-			getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-			setNumFileHandlesForSort(4);
-			
-			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-			
-			addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-			addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-			
-			final AtomicReference<Throwable> error = new AtomicReference<>();
-			
-			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileMatching()") {
-				@Override
-				public void run() {
-					try {
-						testDriver(testTask, MockDelayingMatchStub.class);
-					}
-					catch (Throwable t) {
-						error.set(t);
-					}
-				}
-			};
-			taskRunner.start();
-			
-			Thread.sleep(1000);
-			
-			cancel();
-			taskRunner.interrupt();
-			
-			taskRunner.join(60000);
-			
-			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
-			
-			Throwable taskError = error.get();
-			if (taskError != null) {
-				taskError.printStackTrace();
-				fail("Error in task while canceling: " + taskError.getMessage());
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testHash1MatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 1;
-		
-		int keyCnt2 = 10;
-		int valCnt2 = 2;
-				
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.outList);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
-		this.outList.clear();
-	}
-	
-	@Test
-	public void testHash2MatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 1;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 1;
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.outList);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
-		this.outList.clear();
-	}
-	
-	@Test
-	public void testHash3MatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 1;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.outList);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
-		this.outList.clear();
-	}
-	
-	@Test
-	public void testHash4MatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 1;
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.outList);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
-		this.outList.clear();
-	}
-	
-	@Test
-	public void testHash5MatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.outList);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
-		this.outList.clear();
-	}
-	
-	@Test
-	public void testFailingHashFirstMatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(new NirvanaOutputList());
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockFailingMatchStub.class);
-			Assert.fail("Function exception was not forwarded.");
-		} catch (ExpectedTestException etex) {
-			// good!
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-	}
-	
-	@Test
-	public void testFailingHashSecondMatchTask() {
-		int keyCnt1 = 20;
-		int valCnt1 = 20;
-		
-		int keyCnt2 = 20;
-		int valCnt2 = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(new NirvanaOutputList());
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockFailingMatchStub.class);
-			Assert.fail("Function exception was not forwarded.");
-		} catch (ExpectedTestException etex) {
-			// good!
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-	}
-	
-	@Test
-	public void testCancelHashMatchTaskWhileBuildFirst() {
-		final int keyCnt = 20;
-		final int valCnt = 20;
-
-		try {
-			addInput(new DelayingInfinitiveInputIterator(100));
-			addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-
-			addDriverComparator(this.comparator1);
-			addDriverComparator(this.comparator2);
-
-			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-
-			setOutput(new NirvanaOutputList());
-
-			getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-			getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
-			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
-			final AtomicBoolean success = new AtomicBoolean(false);
-
-			Thread taskRunner = new Thread() {
-				@Override
-				public void run() {
-					try {
-						testDriver(testTask, MockMatchStub.class);
-						success.set(true);
-					} catch (Exception ie) {
-						ie.printStackTrace();
-					}
-				}
-			};
-			taskRunner.start();
-
-			Thread.sleep(1000);
-			cancel();
-
-			try {
-				taskRunner.join();
-			}
-			catch (InterruptedException ie) {
-				Assert.fail("Joining threads failed");
-			}
-
-			Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testHashCancelMatchTaskWhileBuildSecond() {
-		final int keyCnt = 20;
-		final int valCnt = 20;
-
-		try {
-			addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-			addInput(new DelayingInfinitiveInputIterator(100));
-
-			addDriverComparator(this.comparator1);
-			addDriverComparator(this.comparator2);
-
-			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-
-			setOutput(new NirvanaOutputList());
-
-			getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-			getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
-			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
-			final AtomicBoolean success = new AtomicBoolean(false);
-
-			Thread taskRunner = new Thread() {
-				@Override
-				public void run() {
-					try {
-						testDriver(testTask, MockMatchStub.class);
-						success.set(true);
-					} catch (Exception ie) {
-						ie.printStackTrace();
-					}
-				}
-			};
-			taskRunner.start();
-
-			Thread.sleep(1000);
-			cancel();
-
-			try {
-				taskRunner.join();
-			}
-			catch (InterruptedException ie) {
-				Assert.fail("Joining threads failed");
-			}
-
-			Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testHashFirstCancelMatchTaskWhileMatching() {
-		int keyCnt = 20;
-		int valCnt = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(new NirvanaOutputList());
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		final AtomicBoolean success = new AtomicBoolean(false);
-		
-		Thread taskRunner = new Thread() {
-			@Override
-			public void run() {
-				try {
-					testDriver(testTask, MockMatchStub.class);
-					success.set(true);
-				} catch (Exception ie) {
-					ie.printStackTrace();
-				}
-			}
-		};
-		taskRunner.start();
-		
-		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
-		tct.start();
-		
-		try {
-			tct.join();
-			taskRunner.join();
-		} catch(InterruptedException ie) {
-			Assert.fail("Joining threads failed");
-		}
-		
-		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
-	}
-	
-	@Test
-	public void testHashSecondCancelMatchTaskWhileMatching() {
-		int keyCnt = 20;
-		int valCnt = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(new NirvanaOutputList());
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		final AtomicBoolean success = new AtomicBoolean(false);
-		
-		Thread taskRunner = new Thread() {
-			@Override
-			public void run() {
-				try {
-					testDriver(testTask, MockMatchStub.class);
-					success.set(true);
-				} catch (Exception ie) {
-					ie.printStackTrace();
-				}
-			}
-		};
-		taskRunner.start();
-		
-		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
-		tct.start();
-		
-		try {
-			tct.join();
-			taskRunner.join();
-		} catch(InterruptedException ie) {
-			Assert.fail("Joining threads failed");
-		}
-		
-		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
-	}
-	
-	// =================================================================================================
-	
-	public static final class MockMatchStub extends JoinFunction {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
-			out.collect(record1);
-		}
-	}
-	
-	public static final class MockFailingMatchStub extends JoinFunction {
-		private static final long serialVersionUID = 1L;
-		
-		private int cnt = 0;
-		
-		@Override
-		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
-			if (++this.cnt >= 10) {
-				throw new ExpectedTestException();
-			}
-			out.collect(record1);
-		}
-	}
-	
-	public static final class MockDelayingMatchStub extends JoinFunction {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
-			try {
-				Thread.sleep(100);
-			} catch (InterruptedException e) {
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index f59c4a3..415b6bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -26,9 +26,9 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -46,7 +46,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 	private final RecordComparator comparator = new RecordComparator(
 		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
 	
-	private final List<Record> outList = new ArrayList<Record>();
+	private final List<Record> outList = new ArrayList<>();
 	
 	
 	public ReduceTaskExternalITCase(ExecutionConfig config) {
@@ -68,7 +68,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		try {
 			addInputSorted(new UniformRecordGenerator(keyCnt, valCnt, false), this.comparator.duplicate());
 			
-			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 			
 			testDriver(testTask, MockReduceStub.class);
 		} catch (Exception e) {
@@ -100,7 +100,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		try {
 			addInputSorted(new UniformRecordGenerator(keyCnt, valCnt, false), this.comparator.duplicate());
 			
-			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 			
 			testDriver(testTask, MockReduceStub.class);
 		} catch (Exception e) {
@@ -130,14 +130,14 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		
 		CombiningUnilateralSortMerger<Record> sorter = null;
 		try {
-			sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), 
+			sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(),
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), 
 				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
 					this.perSortFractionMem,
 					2, 0.8f, true);
 			addInput(sorter.getIterator());
 			
-			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 			testDriver(testTask, MockCombiningReduceStub.class);
 		} catch (Exception e) {
@@ -176,14 +176,14 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
 		
 		CombiningUnilateralSortMerger<Record> sorter = null;
 		try {
-			sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), 
+			sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(),
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), 
 				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
 					this.perSortFractionMem,
 					2, 0.8f, false);
 			addInput(sorter.getIterator());
 			
-			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 			testDriver(testTask, MockCombiningReduceStub.class);
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index cc25c99..8bc7fe5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -27,9 +27,9 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
 import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
 import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
@@ -51,7 +52,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 	private final RecordComparator comparator = new RecordComparator(
 		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
 	
-	private final List<Record> outList = new ArrayList<Record>();
+	private final List<Record> outList = new ArrayList<>();
 
 	public ReduceTaskTest(ExecutionConfig config) {
 		super(config, 0, 1, 3*1024*1024);
@@ -69,7 +70,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		try {
 			addInputSorted(new UniformRecordGenerator(keyCnt, valCnt, false), this.comparator.duplicate());
 			
-			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 			
 			testDriver(testTask, MockReduceStub.class);
 		} catch (Exception e) {
@@ -96,7 +97,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
-		GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+		GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 		try {
 			testDriver(testTask, MockReduceStub.class);
@@ -125,13 +126,13 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		
 		CombiningUnilateralSortMerger<Record> sorter = null;
 		try {
-			sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(), 
+			sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(),
 				getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false), 
 				getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem,
 					4, 0.8f, true);
 			addInput(sorter.getIterator());
 			
-			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+			GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 			testDriver(testTask, MockCombiningReduceStub.class);
 		} catch (Exception e) {
@@ -168,7 +169,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		setOutput(this.outList);
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
-		GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+		GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 		try {
 			testDriver(testTask, MockFailingReduceStub.class);
@@ -190,7 +191,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
-		final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+		final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 		try {
 			addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator.duplicate());
@@ -238,7 +239,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
 		setOutput(new NirvanaOutputList());
 		getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
 		
-		final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+		final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 1f19699..542812c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -21,17 +21,17 @@ package org.apache.flink.runtime.operators.chaining;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.GenericCollectorMap;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.BatchTask;
-import org.apache.flink.runtime.operators.MapTaskTest.MockMapStub;
+import org.apache.flink.runtime.operators.FlatMapDriver;
+import org.apache.flink.runtime.operators.FlatMapTaskTest.MockMapStub;
 import org.apache.flink.runtime.operators.ReduceTaskTest.MockReduceStub;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -49,14 +49,13 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({Task.class, ResultPartitionWriter.class})
-@SuppressWarnings("deprecation")
 public class ChainTaskTest extends TaskTestBase {
 	
 	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
 
 	private static final int NETWORK_BUFFER_SIZE = 1024;
 	
-	private final List<Record> outList = new ArrayList<Record>();
+	private final List<Record> outList = new ArrayList<>();
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparatorFactory compFact = new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}, new boolean[] {true});
@@ -95,16 +94,16 @@ public class ChainTaskTest extends TaskTestBase {
 				combineConfig.setRelativeMemoryDriver(memoryFraction);
 				
 				// udf
-				combineConfig.setStubWrapper(new UserCodeClassWrapper<MockReduceStub>(MockReduceStub.class));
+				combineConfig.setStubWrapper(new UserCodeClassWrapper<>(MockReduceStub.class));
 				
 				getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, combineConfig, "combine");
 			}
 			
 			// chained map+combine
 			{
-				BatchTask<GenericCollectorMap<Record, Record>, Record> testTask =
-											new BatchTask<GenericCollectorMap<Record, Record>, Record>();
-				registerTask(testTask, CollectorMapDriver.class, MockMapStub.class);
+				BatchTask<FlatMapFunction<Record, Record>, Record> testTask =
+											new BatchTask<>();
+				registerTask(testTask, FlatMapDriver.class, MockMapStub.class);
 				
 				try {
 					testTask.invoke();
@@ -156,17 +155,17 @@ public class ChainTaskTest extends TaskTestBase {
 				combineConfig.setRelativeMemoryDriver(memoryFraction);
 				
 				// udf
-				combineConfig.setStubWrapper(new UserCodeClassWrapper<MockFailingCombineStub>(MockFailingCombineStub.class));
+				combineConfig.setStubWrapper(new UserCodeClassWrapper<>(MockFailingCombineStub.class));
 				
 				getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, combineConfig, "combine");
 			}
 			
 			// chained map+combine
 			{
-				final BatchTask<GenericCollectorMap<Record, Record>, Record> testTask =
-											new BatchTask<GenericCollectorMap<Record, Record>, Record>();
+				final BatchTask<FlatMapFunction<Record, Record>, Record> testTask =
+											new BatchTask<>();
 				
-				super.registerTask(testTask, CollectorMapDriver.class, MockMapStub.class);
+				super.registerTask(testTask, FlatMapDriver.class, MockMapStub.class);
 	
 				boolean stubFailed = false;
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index 777bfc8..63f54ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -19,11 +19,11 @@
 package org.apache.flink.runtime.operators.testutils;
 
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
@@ -94,7 +94,7 @@ public abstract class TaskTestBase extends TestLogger {
 		
 		final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration());
 		config.setDriver(driver);
-		config.setStubWrapper(new UserCodeClassWrapper<RichFunction>(stubClass));
+		config.setStubWrapper(new UserCodeClassWrapper<>(stubClass));
 		
 		task.setEnvironment(this.mockEnv);
 
@@ -116,17 +116,17 @@ public abstract class TaskTestBase extends TestLogger {
 		}
 	}
 
-	public void registerFileOutputTask(AbstractInvokable outTask, Class<? extends FileOutputFormat> stubClass, String outPath) {
+	public void registerFileOutputTask(AbstractInvokable outTask, Class<? extends FileOutputFormat<Record>> stubClass, String outPath) {
 		registerFileOutputTask(outTask, InstantiationUtil.instantiate(stubClass, FileOutputFormat.class), outPath);
 	}
 	
-	public void registerFileOutputTask(AbstractInvokable outTask, FileOutputFormat outputFormat, String outPath) {
+	public void registerFileOutputTask(AbstractInvokable outTask, FileOutputFormat<Record> outputFormat, String outPath) {
 		TaskConfig dsConfig = new TaskConfig(this.mockEnv.getTaskConfiguration());
 		
 		outputFormat.setOutputFilePath(new Path(outPath));
 		outputFormat.setWriteMode(WriteMode.OVERWRITE);
 
-		dsConfig.setStubWrapper(new UserCodeObjectWrapper<FileOutputFormat>(outputFormat));
+		dsConfig.setStubWrapper(new UserCodeObjectWrapper<>(outputFormat));
 
 		outTask.setEnvironment(this.mockEnv);
 
@@ -139,9 +139,9 @@ public abstract class TaskTestBase extends TestLogger {
 	}
 
 	public void registerFileInputTask(AbstractInvokable inTask,
-			Class<? extends DelimitedInputFormat> stubClass, String inPath, String delimiter)
+			Class<? extends DelimitedInputFormat<Record>> stubClass, String inPath, String delimiter)
 	{
-		DelimitedInputFormat format;
+		DelimitedInputFormat<Record> format;
 		try {
 			format = stubClass.newInstance();
 		}
@@ -153,7 +153,7 @@ public abstract class TaskTestBase extends TestLogger {
 		format.setDelimiter(delimiter);
 		
 		TaskConfig dsConfig = new TaskConfig(this.mockEnv.getTaskConfiguration());
-		dsConfig.setStubWrapper(new UserCodeObjectWrapper<DelimitedInputFormat>(format));
+		dsConfig.setStubWrapper(new UserCodeObjectWrapper<>(format));
 		
 		this.inputSplitProvider.addInputSplits(inPath, 5);
 


[4/5] flink git commit: [FLINK-2898] [build] Invert Travis CI build order

Posted by fh...@apache.org.
[FLINK-2898] [build] Invert Travis CI build order

This closes #1290


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2e4a27d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2e4a27d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2e4a27d

Branch: refs/heads/master
Commit: d2e4a27d66faf756c8da5236588c4043ba559ecd
Parents: ab2895f
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Oct 22 11:12:43 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 23 13:05:36 2015 +0200

----------------------------------------------------------------------
 .travis.yml | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d2e4a27d/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index a626519..7213a0c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -14,16 +14,16 @@ language: java
 #See https://issues.apache.org/jira/browse/FLINK-1072
 matrix:
   include:
-    - jdk: "openjdk7" # this uploads the Hadoop 1 build to Maven and S3
-      env: PROFILE="-Dhadoop.profile=1"
-    - jdk: "oraclejdk7" # this uploads the Hadoop 2 build to Maven and S3
-      env: PROFILE="-Dhadoop.version=2.3.0"
-    - jdk: "openjdk7"
-      env: PROFILE="-Dhadoop.version=2.4.0 -Dscala-2.11 -Pinclude-yarn-tests"
-    - jdk: "oraclejdk8"
-      env: PROFILE="-Dhadoop.version=2.5.0 -Dmaven.javadoc.skip=true -Pinclude-yarn-tests"
     - jdk: "oraclejdk8"
       env: PROFILE="-Dhadoop.version=2.6.0 -Dscala-2.11 -Pinclude-tez -Pinclude-yarn-tests -Dmaven.javadoc.skip=true"
+    - jdk: "oraclejdk8"
+      env: PROFILE="-Dhadoop.version=2.5.0 -Dmaven.javadoc.skip=true -Pinclude-yarn-tests"
+    - jdk: "openjdk7"
+      env: PROFILE="-Dhadoop.version=2.4.0 -Dscala-2.11 -Pinclude-yarn-tests"
+    - jdk: "oraclejdk7" # this uploads the Hadoop 2 build to Maven and S3
+      env: PROFILE="-Dhadoop.version=2.3.0"
+    - jdk: "openjdk7" # this uploads the Hadoop 1 build to Maven and S3
+      env: PROFILE="-Dhadoop.profile=1"
 
 
 git:


[3/5] flink git commit: [FLINK-2893] [runtime] Consistent naming of recovery config parameters

Posted by fh...@apache.org.
[FLINK-2893] [runtime] Consistent naming of recovery config parameters

Rename config key prefix from 'ha.zookeeper' to 'recovery.zookeeper'
Rename config key from 'state.backend.fs.dir.recovery' => 'state.backend.fs.recoverydir'
Move ZooKeeper file system state backend configuration keys

This closes #1286


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab2895fa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab2895fa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab2895fa

Branch: refs/heads/master
Commit: ab2895fa988018300395557d110de23a3a2166c9
Parents: 3c8a658
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Oct 22 12:58:26 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 23 13:03:02 2015 +0200

----------------------------------------------------------------------
 docs/setup/config.md                            | 18 +++++------
 docs/setup/jobmanager_high_availability.md      |  6 ++--
 .../flink/configuration/ConfigConstants.java    | 34 ++++++++++----------
 flink-dist/src/main/resources/flink-conf.yaml   |  2 +-
 .../apache/flink/runtime/blob/BlobServer.java   |  2 +-
 .../flink/runtime/blob/FileSystemBlobStore.java |  4 +--
 .../flink/runtime/util/ZooKeeperUtils.java      |  4 +--
 .../flink/runtime/blob/BlobRecoveryITCase.java  |  2 +-
 .../BlobLibraryCacheRecoveryITCase.java         |  2 +-
 .../runtime/testutils/JobManagerProcess.java    |  2 +-
 .../runtime/testutils/TaskManagerProcess.java   |  2 +-
 .../runtime/testutils/ZooKeeperTestUtils.java   |  2 +-
 .../flink/test/recovery/ChaosMonkeyITCase.java  |  2 +-
 .../ZooKeeperLeaderElectionITCase.java          |  4 +--
 .../flink/yarn/YARNHighAvailabilityITCase.java  |  4 +--
 15 files changed, 45 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 0e7b2ee..447a329 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -390,23 +390,23 @@ Flink supports the 'standalone' mode where only a single JobManager runs and no
 The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing.
 Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution.
 In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state.
-In order to use the 'zookeeper' mode, it is mandatory to also define the `ha.zookeeper.quorum` configuration value.
+In order to use the 'zookeeper' mode, it is mandatory to also define the `recovery.zookeeper.quorum` configuration value.
 
-- `ha.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected
+- `recovery.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected
 
-- `ha.zookeeper.dir`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create znodes. 
+- `recovery.zookeeper.path.root`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create znodes. 
 
-- `ha.zookeeper.dir.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.
+- `recovery.zookeeper.path.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.
 
-- `ha.zookeeper.dir.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID
+- `recovery.zookeeper.path.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID
 
-- `ha.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms.
+- `recovery.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms.
 
-- `ha.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms.
+- `recovery.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms.
 
-- `ha.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms.
+- `recovery.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms.
 
-- `ha.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.
+- `recovery.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.
 
 ## Background
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/docs/setup/jobmanager_high_availability.md
----------------------------------------------------------------------
diff --git a/docs/setup/jobmanager_high_availability.md b/docs/setup/jobmanager_high_availability.md
index 50379ea..55b15ad 100644
--- a/docs/setup/jobmanager_high_availability.md
+++ b/docs/setup/jobmanager_high_availability.md
@@ -50,13 +50,13 @@ In high availabliity mode, all Flink components try to connect to a JobManager v
 
 - **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service.
   
-  <pre>ha.zookeeper.quorum: address1:2181[,...],addressX:2181</pre>
+  <pre>recovery.zookeeper.quorum: address1:2181[,...],addressX:2181</pre>
 
   Each *addressX:port* refers to a ZooKeeper server, which is reachable by Flink at the given address and port.
 
 - The following configuration keys are optional:
 
-  - `ha.zookeeper.dir: /flink [default]`: ZooKeeper directory to use for coordination
+  - `recovery.zookeeper.path.root: /flink [default]`: ZooKeeper directory to use for coordination
   - TODO Add client configuration keys
 
 ## Starting an HA-cluster
@@ -93,7 +93,7 @@ The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each
    
    <pre>
 recovery.mode: zookeeper
-ha.zookeeper.quorum: localhost</pre>
+recovery.zookeeper.quorum: localhost</pre>
 
 2. **Configure masters** in `conf/masters`:
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index fc2087a..b64939e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -410,12 +410,6 @@ public final class ConfigConstants {
 	 */
 	public static final String STATE_BACKEND = "state.backend";
 	
-	/**
-	 * File system state backend base path for recoverable state handles. Recovery state is written
-	 * to this path and the file state handles are persisted for recovery.
-	 */
-	public static final String STATE_BACKEND_FS_RECOVERY_PATH = "state.backend.fs.dir.recovery";
-	
 	// ----------------------------- Miscellaneous ----------------------------
 	
 	/**
@@ -433,31 +427,37 @@ public final class ConfigConstants {
 	// --------------------------- ZooKeeper ----------------------------------
 
 	/** ZooKeeper servers. */
-	public static final String ZOOKEEPER_QUORUM_KEY = "ha.zookeeper.quorum";
+	public static final String ZOOKEEPER_QUORUM_KEY = "recovery.zookeeper.quorum";
+
+	/**
+	 * File system state backend base path for recoverable state handles. Recovery state is written
+	 * to this path and the file state handles are persisted for recovery.
+	 */
+	public static final String ZOOKEEPER_RECOVERY_PATH = "recovery.zookeeper.storageDir";
 
 	/** ZooKeeper root path. */
-	public static final String ZOOKEEPER_DIR_KEY = "ha.zookeeper.dir";
+	public static final String ZOOKEEPER_DIR_KEY = "recovery.zookeeper.path.root";
 
-	public static final String ZOOKEEPER_LATCH_PATH = "ha.zookeeper.dir.latch";
+	public static final String ZOOKEEPER_LATCH_PATH = "recovery.zookeeper.path.latch";
 
-	public static final String ZOOKEEPER_LEADER_PATH = "ha.zookeeper.dir.leader";
+	public static final String ZOOKEEPER_LEADER_PATH = "recovery.zookeeper.path.leader";
 
 	/** ZooKeeper root path (ZNode) for job graphs. */
-	public static final String ZOOKEEPER_JOBGRAPHS_PATH = "ha.zookeeper.dir.jobgraphs";
+	public static final String ZOOKEEPER_JOBGRAPHS_PATH = "recovery.zookeeper.path.jobgraphs";
 
 	/** ZooKeeper root path (ZNode) for completed checkpoints. */
-	public static final String ZOOKEEPER_CHECKPOINTS_PATH = "ha.zookeeper.dir.checkpoints";
+	public static final String ZOOKEEPER_CHECKPOINTS_PATH = "recovery.zookeeper.path.checkpoints";
 
 	/** ZooKeeper root path (ZNode) for checkpoint counters. */
-	public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "ha.zookeeper.dir.checkpoint-counter";
+	public static final String ZOOKEEPER_CHECKPOINT_COUNTER_PATH = "recovery.zookeeper.path.checkpoint-counter";
 
-	public static final String ZOOKEEPER_SESSION_TIMEOUT = "ha.zookeeper.client.session-timeout";
+	public static final String ZOOKEEPER_SESSION_TIMEOUT = "recovery.zookeeper.client.session-timeout";
 
-	public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "ha.zookeeper.client.connection-timeout";
+	public static final String ZOOKEEPER_CONNECTION_TIMEOUT = "recovery.zookeeper.client.connection-timeout";
 
-	public static final String ZOOKEEPER_RETRY_WAIT = "ha.zookeeper.client.retry-wait";
+	public static final String ZOOKEEPER_RETRY_WAIT = "recovery.zookeeper.client.retry-wait";
 
-	public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "ha.zookeeper.client.max-retry-attempts";
+	public static final String ZOOKEEPER_MAX_RETRY_ATTEMPTS = "recovery.zookeeper.client.max-retry-attempts";
 
 	// ------------------------------------------------------------------------
 	//                            Default Values

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/flink-dist/src/main/resources/flink-conf.yaml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index 157e34a..5928d93 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -137,4 +137,4 @@ webclient.port: 8080
 #
 # recovery.mode: zookeeper
 #
-# ha.zookeeper.quorum: localhost
+# recovery.zookeeper.quorum: localhost

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index d0bed8c..851cff4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -100,7 +100,7 @@ public class BlobServer extends Thread implements BlobService {
 		// Recovery. Check that everything has been setup correctly. This is not clean, but it's
 		// better to resolve this with some upcoming changes to the state backend setup.
 		else if (config.containsKey(ConfigConstants.STATE_BACKEND) &&
-				config.containsKey(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH)) {
+				config.containsKey(ConfigConstants.ZOOKEEPER_RECOVERY_PATH)) {
 
 			this.blobStore = new FileSystemBlobStore(config);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
index 4351eb1..4c63873 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/FileSystemBlobStore.java
@@ -51,12 +51,12 @@ class FileSystemBlobStore implements BlobStore {
 
 	FileSystemBlobStore(Configuration config) throws IOException {
 		String stateBackendBasePath = config.getString(
-				ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
+				ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");
 
 		if (stateBackendBasePath.equals("")) {
 			throw new IllegalConfigurationException(String.format("Missing configuration for " +
 				"file system state backend recovery path. Please specify via " +
-				"'%s' key.", ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH));
+				"'%s' key.", ConfigConstants.ZOOKEEPER_RECOVERY_PATH));
 		}
 
 		stateBackendBasePath += "/blob";

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index a32fc65..79bd28b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -276,11 +276,11 @@ public class ZooKeeperUtils {
 			String prefix) throws IOException {
 
 		String rootPath = configuration.getString(
-			ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, "");
+			ConfigConstants.ZOOKEEPER_RECOVERY_PATH, "");
 
 		if (rootPath.equals("")) {
 			throw new IllegalConfigurationException("Missing recovery path. Specify via " +
-				"configuration key '" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "'.");
+				"configuration key '" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "'.");
 		} else {
 			return new FileSystemStateStorageHelper<T>(rootPath, prefix);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
index 0e324a8..2eff4fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobRecoveryITCase.java
@@ -70,7 +70,7 @@ public class BlobRecoveryITCase {
 			Configuration config = new Configuration();
 			config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
 			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-			config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, recoveryDir.getPath());
+			config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, recoveryDir.getPath());
 
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
index ea4195c..6068322 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/execution/librarycache/BlobLibraryCacheRecoveryITCase.java
@@ -65,7 +65,7 @@ public class BlobLibraryCacheRecoveryITCase {
 			Configuration config = new Configuration();
 			config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
 			config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
-			config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, temporaryFolder.getRoot().getAbsolutePath());
+			config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, temporaryFolder.getRoot().getAbsolutePath());
 
 			for (int i = 0; i < server.length; i++) {
 				server[i] = new BlobServer(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
index 85b768d..0641493 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/JobManagerProcess.java
@@ -197,7 +197,7 @@ public class JobManagerProcess extends TestJvmProcess {
 		 * <code>--port PORT</code>.
 		 *
 		 * <p>Other arguments are parsed to a {@link Configuration} and passed to the
-		 * JobManager, for instance: <code>--recovery.mode ZOOKEEPER --ha.zookeeper.quorum
+		 * JobManager, for instance: <code>--recovery.mode ZOOKEEPER --recovery.zookeeper.quorum
 		 * "xyz:123:456"</code>.
 		 */
 		public static void main(String[] args) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
index f683c55..86449a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TaskManagerProcess.java
@@ -99,7 +99,7 @@ public class TaskManagerProcess extends TestJvmProcess {
 		 * and streaming jobs).
 		 *
 		 * <p>All arguments are parsed to a {@link Configuration} and passed to the Taskmanager,
-		 * for instance: <code>--recovery.mode ZOOKEEPER --ha.zookeeper.quorum "xyz:123:456"</code>.
+		 * for instance: <code>--recovery.mode ZOOKEEPER --recovery.zookeeper.quorum "xyz:123:456"</code>.
 		 */
 		public static void main(String[] args) throws Exception {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
index a65ec01..e0f8625 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/ZooKeeperTestUtils.java
@@ -81,7 +81,7 @@ public class ZooKeeperTestUtils {
 		// File system state backend
 		config.setString(ConfigConstants.STATE_BACKEND, "FILESYSTEM");
 		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, fsStateHandlePath + "/checkpoints");
-		config.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, fsStateHandlePath + "/recovery");
+		config.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, fsStateHandlePath + "/recovery");
 
 		// Akka failure detection and execution retries
 		config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "1000 ms");

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
index f15644e..f536418 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ChaosMonkeyITCase.java
@@ -556,7 +556,7 @@ public class ChaosMonkeyITCase {
 					0, files.length);
 		}
 
-		File fsRecovery = new File(config.getString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, ""));
+		File fsRecovery = new File(config.getString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, ""));
 
 		LOG.info("Checking " + fsRecovery);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 5840a98..10417c8 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -95,7 +95,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
+		configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
 
 		ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
 
@@ -144,7 +144,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger {
 		configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
 		configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTM);
 		configuration.setString(ConfigConstants.STATE_BACKEND, "filesystem");
-		configuration.setString(ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
+		configuration.setString(ConfigConstants.ZOOKEEPER_RECOVERY_PATH, tempDirectory.getAbsoluteFile().toURI().toString());
 
 		// we "effectively" disable the automatic RecoverAllJobs message and sent it manually to make
 		// sure that all TMs have registered to the JM prior to issueing the RecoverAllJobs message

http://git-wip-us.apache.org/repos/asf/flink/blob/ab2895fa/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index ffb43f8..f68b141 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -112,11 +112,11 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 		String fsStateHandlePath = tmp.getRoot().getPath();
 
 		flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
-		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@ha.zookeeper.quorum=" +
+		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
 			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
 			"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +
 			"@@" + FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY + "=" + fsStateHandlePath + "/checkpoints" +
-			"@@" + ConfigConstants.STATE_BACKEND_FS_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery");
+			"@@" + ConfigConstants.ZOOKEEPER_RECOVERY_PATH + "=" + fsStateHandlePath + "/recovery");
 		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
 
 		AbstractFlinkYarnCluster yarnCluster = null;


[2/5] flink git commit: [FLINK-1982] [record-api] Remove dependencies on Record API from flink-runtime tests

Posted by fh...@apache.org.
[FLINK-1982] [record-api] Remove dependencies on Record API from flink-runtime tests

Rename Match*Test to Join*Test and MapTaskTest to FlatMapTaskTest

This closes #1294


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3c8a6588
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3c8a6588
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3c8a6588

Branch: refs/heads/master
Commit: 3c8a6588ac9bd35984d6e3b1eef916461a262fe4
Parents: 7ff071f
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Oct 22 21:10:41 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 23 13:01:37 2015 +0200

----------------------------------------------------------------------
 .../operators/CombineTaskExternalITCase.java    |   16 +-
 .../flink/runtime/operators/CrossTaskTest.java  |   41 +-
 .../runtime/operators/DataSinkTaskTest.java     |   52 +-
 .../runtime/operators/DataSourceTaskTest.java   |   16 +-
 .../runtime/operators/FlatMapTaskTest.java      |  149 +++
 .../operators/JoinTaskExternalITCase.java       |  165 +++
 .../flink/runtime/operators/JoinTaskTest.java   | 1017 +++++++++++++++++
 .../flink/runtime/operators/MapTaskTest.java    |  151 ---
 .../operators/MatchTaskExternalITCase.java      |  167 ---
 .../flink/runtime/operators/MatchTaskTest.java  | 1019 ------------------
 .../operators/ReduceTaskExternalITCase.java     |   16 +-
 .../flink/runtime/operators/ReduceTaskTest.java |   19 +-
 .../operators/chaining/ChainTaskTest.java       |   25 +-
 .../operators/testutils/TaskTestBase.java       |   18 +-
 14 files changed, 1428 insertions(+), 1443 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index 4905e57..800bca7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -22,11 +22,11 @@ import java.util.ArrayList;
 import java.util.HashMap;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
 import org.apache.flink.api.common.typeutils.record.RecordComparator;
 import org.apache.flink.runtime.operators.testutils.DriverTestBase;
 import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -42,7 +42,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 
 	private final double combine_frac;
 	
-	private final ArrayList<Record> outList = new ArrayList<Record>();
+	private final ArrayList<Record> outList = new ArrayList<>();
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator = new RecordComparator(
@@ -69,7 +69,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
-		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
+		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<>();
 		
 		try {
 			testDriver(testTask, MockCombiningReduceStub.class);
@@ -85,7 +85,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		
 		// wee need to do the final aggregation manually in the test, because the
 		// combiner is not guaranteed to do that
-		final HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>();
+		final HashMap<IntValue, IntValue> aggMap = new HashMap<>();
 		for (Record record : this.outList) {
 			IntValue key = new IntValue();
 			IntValue value = new IntValue();
@@ -123,7 +123,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		getTaskConfig().setRelativeMemoryDriver(combine_frac);
 		getTaskConfig().setFilehandlesDriver(2);
 		
-		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
+		final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<>();
 
 		try {
 			testDriver(testTask, MockCombiningReduceStub.class);
@@ -139,7 +139,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		
 		// wee need to do the final aggregation manually in the test, because the
 		// combiner is not guaranteed to do that
-		final HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>();
+		final HashMap<IntValue, IntValue> aggMap = new HashMap<>();
 		for (Record record : this.outList) {
 			IntValue key = new IntValue();
 			IntValue value = new IntValue();
@@ -166,7 +166,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 	// ------------------------------------------------------------------------
 	// ------------------------------------------------------------------------
 
-	@ReduceOperator.Combinable
+	@Combinable
 	public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 
@@ -194,7 +194,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
 		}
 	}
 
-	@ReduceOperator.Combinable
+	@Combinable
 	public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
index 4c27a68..2d8838a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.types.Record;
 import org.junit.Test;
 
-@SuppressWarnings("deprecation")
 public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>> {
 	
 	private static final long CROSS_MEM = 1024 * 1024;
@@ -65,7 +64,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -95,7 +94,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -123,7 +122,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockFailingCrossStub.class);
@@ -153,7 +152,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockFailingCrossStub.class);
@@ -184,7 +183,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -215,7 +214,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -243,7 +242,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockFailingCrossStub.class);
@@ -272,7 +271,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockFailingCrossStub.class);
@@ -303,7 +302,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -333,7 +332,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -363,7 +362,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -393,7 +392,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		try {
 			testDriver(testTask, MockCrossStub.class);
@@ -420,7 +419,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		
@@ -463,7 +462,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		
@@ -485,7 +484,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		
 		try {
 			tct.join();
-			taskRunner.join();		
+			taskRunner.join();
 		} catch(InterruptedException ie) {
 			Assert.fail("Joining threads failed");
 		}
@@ -506,7 +505,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		
@@ -549,7 +548,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
 		getTaskConfig().setRelativeMemoryDriver(cross_frac);
 		
-		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+		final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
 		
 		final AtomicBoolean success = new AtomicBoolean(false);
 		
@@ -571,7 +570,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		
 		try {
 			tct.join();
-			taskRunner.join();		
+			taskRunner.join();
 		} catch(InterruptedException ie) {
 			Assert.fail("Joining threads failed");
 		}
@@ -579,7 +578,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		Assert.assertTrue("Exception was thrown despite proper canceling.", success.get());
 	}
 	
-	public static final class MockCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
+	public static final class MockCrossStub implements CrossFunction<Record, Record, Record> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -588,7 +587,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
 		}
 	}
 	
-	public static final class MockFailingCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
+	public static final class MockFailingCrossStub implements CrossFunction<Record, Record, Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private int cnt = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index e91d338..b741b64 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.operators;
 
+import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.junit.After;
 import org.junit.Assert;
@@ -81,7 +80,7 @@ public class DataSinkTaskTest extends TaskTestBase
 			super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 			super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 
-			DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+			DataSinkTask<Record> testTask = new DataSinkTask<>();
 
 			super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
 
@@ -94,7 +93,7 @@ public class DataSinkTaskTest extends TaskTestBase
 			fr = new FileReader(tempTestFile);
 			br = new BufferedReader(fr);
 
-			HashMap<Integer, HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt);
+			HashMap<Integer, HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt);
 
 			while (br.ready()) {
 				String line = br.readLine();
@@ -144,7 +143,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		readers[2] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 2, 0, false), 0, false);
 		readers[3] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 3, 0, false), 0, false);
 
-		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+		DataSinkTask<Record> testTask = new DataSinkTask<>();
 
 		super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
 
@@ -171,7 +170,7 @@ public class DataSinkTaskTest extends TaskTestBase
 			fr = new FileReader(tempTestFile);
 			br = new BufferedReader(fr);
 
-			HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt);
+			HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt);
 
 			while(br.ready()) {
 				String line = br.readLine();
@@ -219,12 +218,12 @@ public class DataSinkTaskTest extends TaskTestBase
 
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
 
-		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+		DataSinkTask<Record> testTask = new DataSinkTask<>();
 
 		// set sorting
 		super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
 		super.getTaskConfig().setInputComparator(
-				new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})), 0);
+				new RecordComparatorFactory(new int[]{1},(new Class[]{IntValue.class})), 0);
 		super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
@@ -248,7 +247,7 @@ public class DataSinkTaskTest extends TaskTestBase
 			fr = new FileReader(tempTestFile);
 			br = new BufferedReader(fr);
 
-			Set<Integer> keys = new HashSet<Integer>();
+			Set<Integer> keys = new HashSet<>();
 
 			int curVal = -1;
 			while(br.ready()) {
@@ -297,7 +296,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
 
-		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+		DataSinkTask<Record> testTask = new DataSinkTask<>();
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
 
@@ -323,21 +322,20 @@ public class DataSinkTaskTest extends TaskTestBase
 	public void testFailingSortingDataSinkTask() {
 
 		int keyCnt = 100;
-		int valCnt = 20;;
+		int valCnt = 20;
 		double memoryFraction = 1.0;
 
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
 
-		DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+		DataSinkTask<Record> testTask = new DataSinkTask<>();
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
 
 		// set sorting
 		super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
 		super.getTaskConfig().setInputComparator(
-				new RecordComparatorFactory(new int[]{1}, ((Class<? extends Key<?>>[]) new Class[]{IntValue.class})),
-				0);
+				new RecordComparatorFactory(new int[]{1}, ( new Class[]{IntValue.class})), 0);
 		super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
@@ -365,7 +363,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new InfiniteInputIterator(), 0);
 
-		final DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+		final DataSinkTask<Record> testTask = new DataSinkTask<>();
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
 
@@ -407,15 +405,14 @@ public class DataSinkTaskTest extends TaskTestBase
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addInput(new InfiniteInputIterator(), 0);
 
-		final DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+		final DataSinkTask<Record> testTask = new DataSinkTask<>();
 		Configuration stubParams = new Configuration();
 		super.getTaskConfig().setStubParameters(stubParams);
 
 		// set sorting
 		super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
 		super.getTaskConfig().setInputComparator(
-				new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})),
-				0);
+				new RecordComparatorFactory(new int[]{1},(new Class[]{IntValue.class})), 0);
 		super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
 		super.getTaskConfig().setFilehandlesInput(0, 8);
 		super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
@@ -447,7 +444,7 @@ public class DataSinkTaskTest extends TaskTestBase
 
 	}
 
-	public static class MockOutputFormat extends DelimitedOutputFormat {
+	public static class MockOutputFormat extends FileOutputFormat<Record> {
 		private static final long serialVersionUID = 1L;
 
 		final StringBuilder bld = new StringBuilder();
@@ -458,8 +455,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		}
 
 		@Override
-		public int serializeRecord(Record rec, byte[] target) throws Exception
-		{
+		public void writeRecord(Record rec) throws IOException {
 			IntValue key = rec.getField(0, IntValue.class);
 			IntValue value = rec.getField(1, IntValue.class);
 
@@ -467,14 +463,11 @@ public class DataSinkTaskTest extends TaskTestBase
 			this.bld.append(key.getValue());
 			this.bld.append('_');
 			this.bld.append(value.getValue());
+			this.bld.append('\n');
 
 			byte[] bytes = this.bld.toString().getBytes();
-			if (bytes.length <= target.length) {
-				System.arraycopy(bytes, 0, target, 0, bytes.length);
-				return bytes.length;
-			}
-			// else
-			return -bytes.length;
+
+			this.stream.write(bytes);
 		}
 
 	}
@@ -490,12 +483,11 @@ public class DataSinkTaskTest extends TaskTestBase
 		}
 
 		@Override
-		public int serializeRecord(Record rec, byte[] target) throws Exception
-		{
+		public void writeRecord(Record rec) throws IOException {
 			if (++this.cnt >= 10) {
 				throw new RuntimeException("Expected Test Exception");
 			}
-			return super.serializeRecord(rec, target);
+			super.writeRecord(rec);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 4548410..96ae700 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -28,11 +28,11 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 
+import org.apache.flink.api.common.io.DelimitedInputFormat;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.junit.Assert;
 
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
 import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
 import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -83,7 +83,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addOutput(this.outList);
 		
-		DataSourceTask<Record> testTask = new DataSourceTask<Record>();
+		DataSourceTask<Record> testTask = new DataSourceTask<>();
 		
 		super.registerFileInputTask(testTask, MockInputFormat.class, new File(tempTestPath).toURI().toString(), "\n");
 		
@@ -97,7 +97,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		Assert.assertTrue("Invalid output size. Expected: "+(keyCnt*valCnt)+" Actual: "+this.outList.size(),
 			this.outList.size() == keyCnt * valCnt);
 		
-		HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt);
+		HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt);
 		
 		for (Record kvp : this.outList) {
 			
@@ -138,7 +138,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 		super.addOutput(this.outList);
 		
-		DataSourceTask<Record> testTask = new DataSourceTask<Record>();
+		DataSourceTask<Record> testTask = new DataSourceTask<>();
 
 		super.registerFileInputTask(testTask, MockFailingInputFormat.class, new File(tempTestPath).toURI().toString(), "\n");
 		
@@ -172,7 +172,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 			Assert.fail("Unable to set-up test input file");
 		}
 		
-		final DataSourceTask<Record> testTask = new DataSourceTask<Record>();
+		final DataSourceTask<Record> testTask = new DataSourceTask<>();
 
 		super.registerFileInputTask(testTask, MockDelayingInputFormat.class,  new File(tempTestPath).toURI().toString(), "\n");
 		
@@ -232,7 +232,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		}
 	}
 	
-	public static class MockInputFormat extends DelimitedInputFormat {
+	public static class MockInputFormat extends DelimitedInputFormat<Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private final IntValue key = new IntValue();
@@ -257,7 +257,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		}
 	}
 	
-	public static class MockDelayingInputFormat extends DelimitedInputFormat {
+	public static class MockDelayingInputFormat extends DelimitedInputFormat<Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private final IntValue key = new IntValue();
@@ -289,7 +289,7 @@ public class DataSourceTaskTest extends TaskTestBase {
 		
 	}
 	
-	public static class MockFailingInputFormat extends DelimitedInputFormat {
+	public static class MockFailingInputFormat extends DelimitedInputFormat<Record> {
 		private static final long serialVersionUID = 1L;
 		
 		private final IntValue key = new IntValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java
new file mode 100644
index 0000000..3d7f99e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DriverTestBase;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
+import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlatMapTaskTest extends DriverTestBase<FlatMapFunction<Record, Record>> {
+	
+	private static final Logger LOG = LoggerFactory.getLogger(FlatMapTaskTest.class);
+	
+	private final CountingOutputCollector output = new CountingOutputCollector();
+	
+	
+	public FlatMapTaskTest(ExecutionConfig config) {
+		super(config, 0, 0);
+	}
+	
+	@Test
+	public void testMapTask() {
+		final int keyCnt = 100;
+		final int valCnt = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+		setOutput(this.output);
+		
+		final FlatMapDriver<Record, Record> testDriver = new FlatMapDriver<>();
+		
+		try {
+			testDriver(testDriver, MockMapStub.class);
+		} catch (Exception e) {
+			LOG.debug("Exception while running the test driver.", e);
+			Assert.fail("Invoke method caused exception.");
+		}
+		
+		Assert.assertEquals("Wrong result set size.", keyCnt*valCnt, this.output.getNumberOfRecords());
+	}
+	
+	@Test
+	public void testFailingMapTask() {
+		final int keyCnt = 100;
+		final int valCnt = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+		setOutput(new DiscardingOutputCollector<Record>());
+		
+		final FlatMapDriver<Record, Record> testTask = new FlatMapDriver<>();
+		try {
+			testDriver(testTask, MockFailingMapStub.class);
+			Assert.fail("Function exception was not forwarded.");
+		} catch (ExpectedTestException e) {
+			// good!
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Exception in test.");
+		}
+	}
+	
+	@Test
+	public void testCancelMapTask() {
+		addInput(new InfiniteInputIterator());
+		setOutput(new DiscardingOutputCollector<Record>());
+		
+		final FlatMapDriver<Record, Record> testTask = new FlatMapDriver<>();
+		
+		final AtomicBoolean success = new AtomicBoolean(false);
+		
+		final Thread taskRunner = new Thread() {
+			@Override
+			public void run() {
+				try {
+					testDriver(testTask, MockMapStub.class);
+					success.set(true);
+				} catch (Exception ie) {
+					ie.printStackTrace();
+				}
+			}
+		};
+		taskRunner.start();
+		
+		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
+		tct.start();
+		
+		try {
+			tct.join();
+			taskRunner.join();		
+		} catch(InterruptedException ie) {
+			Assert.fail("Joining threads failed");
+		}
+		
+		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+	}
+	
+	public static class MockMapStub extends RichFlatMapFunction<Record, Record> {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public void flatMap(Record record, Collector<Record> out) throws Exception {
+			out.collect(record);
+		}
+		
+	}
+	
+	public static class MockFailingMapStub extends RichFlatMapFunction<Record, Record> {
+		private static final long serialVersionUID = 1L;
+		
+		private int cnt = 0;
+		
+		@Override
+		public void flatMap(Record record, Collector<Record> out) throws Exception {
+			if (++this.cnt >= 10) {
+				throw new ExpectedTestException();
+			}
+			out.collect(record);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
new file mode 100644
index 0000000..5b2e6eb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.junit.Assert;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.operators.testutils.DriverTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class JoinTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
+	
+	private static final long HASH_MEM = 4*1024*1024;
+	
+	private static final long SORT_MEM = 3*1024*1024;
+	
+	private static final long BNLJN_MEM = 10 * PAGE_SIZE;
+
+	private final double bnljn_frac;
+
+	private final double hash_frac;
+
+	@SuppressWarnings("unchecked")
+	private final RecordComparator comparator1 = new RecordComparator(
+		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+	
+	@SuppressWarnings("unchecked")
+	private final RecordComparator comparator2 = new RecordComparator(
+		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+	
+	private final CountingOutputCollector output = new CountingOutputCollector();
+	
+	public JoinTaskExternalITCase(ExecutionConfig config) {
+		super(config, HASH_MEM, 2, SORT_MEM);
+		bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
+		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
+	}
+	
+	@Test
+	public void testExternalSort1MatchTask() {
+		final int keyCnt1 = 16384*4;
+		final int valCnt1 = 2;
+		
+		final int keyCnt2 = 8192;
+		final int valCnt2 = 4*2;
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		setOutput(this.output);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
+	}
+	
+	@Test
+	public void testExternalHash1MatchTask() {
+		final int keyCnt1 = 32768;
+		final int valCnt1 = 8;
+		
+		final int keyCnt2 = 65536;
+		final int valCnt2 = 8;
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.output);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
+	}
+	
+	@Test
+	public void testExternalHash2MatchTask() {
+		final int keyCnt1 = 32768;
+		final int valCnt1 = 8;
+		
+		final int keyCnt2 = 65536;
+		final int valCnt2 = 8;
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.output);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
+	}
+	
+	public static final class MockMatchStub implements FlatJoinFunction<Record, Record, Record> {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
+			out.collect(value1);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
new file mode 100644
index 0000000..ecde59e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
@@ -0,0 +1,1017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
+import org.apache.flink.runtime.operators.testutils.DriverTestBase;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
+import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class JoinTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
+	
+	private static final long HASH_MEM = 6*1024*1024;
+	
+	private static final long SORT_MEM = 3*1024*1024;
+
+	private static final int NUM_SORTER = 2;
+	
+	private static final long BNLJN_MEM = 10 * PAGE_SIZE;
+
+	private final double bnljn_frac;
+
+	private final double hash_frac;
+	
+	@SuppressWarnings("unchecked")
+	private final RecordComparator comparator1 = new RecordComparator(
+		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
+	
+	@SuppressWarnings("unchecked")
+	private final RecordComparator comparator2 = new RecordComparator(
+		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
+	
+	private final List<Record> outList = new ArrayList<>();
+	
+	
+	public JoinTaskTest(ExecutionConfig config) {
+		super(config, HASH_MEM, NUM_SORTER, SORT_MEM);
+		bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
+		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
+	}
+	
+	
+	@Test
+	public void testSortBoth1MatchTask() {
+		final int keyCnt1 = 20;
+		final int valCnt1 = 1;
+		
+		final int keyCnt2 = 10;
+		final int valCnt2 = 2;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+	}
+	
+	@Test
+	public void testSortBoth2MatchTask() {
+
+		int keyCnt1 = 20;
+		int valCnt1 = 1;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 1;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testSortBoth3MatchTask() {
+
+		int keyCnt1 = 20;
+		int valCnt1 = 1;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testSortBoth4MatchTask() {
+
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 1;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testSortBoth5MatchTask() {
+
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testSortFirstMatchTask() {
+
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+			addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testSortSecondMatchTask() {
+
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
+			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testMergeMatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		setOutput(this.outList);
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+		
+		int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		
+		Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+		
+		this.outList.clear();
+		
+	}
+	
+	@Test
+	public void testFailingMatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		setOutput(new NirvanaOutputList());
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+		setNumFileHandlesForSort(4);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
+		
+		try {
+			testDriver(testTask, MockFailingMatchStub.class);
+			Assert.fail("Driver did not forward Exception.");
+		} catch (ExpectedTestException e) {
+			// good!
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("The test caused an exception.");
+		}
+	}
+	
+	@Test
+	public void testCancelMatchTaskWhileSort1() {
+		final int keyCnt = 20;
+		final int valCnt = 20;
+		
+		try {
+			setOutput(new NirvanaOutputList());
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+			getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+			setNumFileHandlesForSort(4);
+			
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+			
+			try {
+				addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+				addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+			} catch (Exception e) {
+				e.printStackTrace();
+				Assert.fail("The test caused an exception.");
+			}
+	
+			final AtomicReference<Throwable> error = new AtomicReference<>();
+
+			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort1()") {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockMatchStub.class);
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
+				}
+			};
+			taskRunner.start();
+
+			Thread.sleep(1000);
+
+			cancel();
+			taskRunner.interrupt();
+
+			taskRunner.join(60000);
+
+			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+			Throwable taskError = error.get();
+			if (taskError != null) {
+				taskError.printStackTrace();
+				fail("Error in task while canceling: " + taskError.getMessage());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCancelMatchTaskWhileSort2() {
+		final int keyCnt = 20;
+		final int valCnt = 20;
+		
+		try {
+			setOutput(new NirvanaOutputList());
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+			getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+			setNumFileHandlesForSort(4);
+			
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+			
+			try {
+				addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+				addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+			} catch (Exception e) {
+				e.printStackTrace();
+				Assert.fail("The test caused an exception.");
+			}
+
+			final AtomicReference<Throwable> error = new AtomicReference<>();
+
+			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort2()") {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockMatchStub.class);
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
+				}
+			};
+			taskRunner.start();
+
+			Thread.sleep(1000);
+
+			cancel();
+			taskRunner.interrupt();
+
+			taskRunner.join(60000);
+
+			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+			Throwable taskError = error.get();
+			if (taskError != null) {
+				taskError.printStackTrace();
+				fail("Error in task while canceling: " + taskError.getMessage());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testCancelMatchTaskWhileMatching() {
+		final int keyCnt = 20;
+		final int valCnt = 20;
+		
+		try {
+			setOutput(new NirvanaOutputList());
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+			getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+			setNumFileHandlesForSort(4);
+			
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+			
+			addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+			addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+			
+			final AtomicReference<Throwable> error = new AtomicReference<>();
+			
+			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileMatching()") {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockDelayingMatchStub.class);
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
+				}
+			};
+			taskRunner.start();
+			
+			Thread.sleep(1000);
+			
+			cancel();
+			taskRunner.interrupt();
+			
+			taskRunner.join(60000);
+			
+			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+			
+			Throwable taskError = error.get();
+			if (taskError != null) {
+				taskError.printStackTrace();
+				fail("Error in task while canceling: " + taskError.getMessage());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testHash1MatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 1;
+		
+		int keyCnt2 = 10;
+		int valCnt2 = 2;
+				
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.outList);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+		this.outList.clear();
+	}
+	
+	@Test
+	public void testHash2MatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 1;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 1;
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.outList);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+		this.outList.clear();
+	}
+	
+	@Test
+	public void testHash3MatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 1;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.outList);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+		this.outList.clear();
+	}
+	
+	@Test
+	public void testHash4MatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 1;
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.outList);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+		this.outList.clear();
+	}
+	
+	@Test
+	public void testHash5MatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(this.outList);
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockMatchStub.class);
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+		
+		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+		Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+		this.outList.clear();
+	}
+	
+	@Test
+	public void testFailingHashFirstMatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(new NirvanaOutputList());
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockFailingMatchStub.class);
+			Assert.fail("Function exception was not forwarded.");
+		} catch (ExpectedTestException etex) {
+			// good!
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+	}
+	
+	@Test
+	public void testFailingHashSecondMatchTask() {
+		int keyCnt1 = 20;
+		int valCnt1 = 20;
+		
+		int keyCnt2 = 20;
+		int valCnt2 = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(new NirvanaOutputList());
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		try {
+			testDriver(testTask, MockFailingMatchStub.class);
+			Assert.fail("Function exception was not forwarded.");
+		} catch (ExpectedTestException etex) {
+			// good!
+		} catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("Test caused an exception.");
+		}
+	}
+	
+	@Test
+	public void testCancelHashMatchTaskWhileBuildFirst() {
+		final int keyCnt = 20;
+		final int valCnt = 20;
+
+		try {
+			addInput(new DelayingInfinitiveInputIterator(100));
+			addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+
+			setOutput(new NirvanaOutputList());
+
+			getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+			getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+			final AtomicBoolean success = new AtomicBoolean(false);
+
+			Thread taskRunner = new Thread() {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockMatchStub.class);
+						success.set(true);
+					} catch (Exception ie) {
+						ie.printStackTrace();
+					}
+				}
+			};
+			taskRunner.start();
+
+			Thread.sleep(1000);
+			cancel();
+
+			try {
+				taskRunner.join();
+			}
+			catch (InterruptedException ie) {
+				Assert.fail("Joining threads failed");
+			}
+
+			Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testHashCancelMatchTaskWhileBuildSecond() {
+		final int keyCnt = 20;
+		final int valCnt = 20;
+
+		try {
+			addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+			addInput(new DelayingInfinitiveInputIterator(100));
+
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+
+			setOutput(new NirvanaOutputList());
+
+			getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+			getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+			final AtomicBoolean success = new AtomicBoolean(false);
+
+			Thread taskRunner = new Thread() {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockMatchStub.class);
+						success.set(true);
+					} catch (Exception ie) {
+						ie.printStackTrace();
+					}
+				}
+			};
+			taskRunner.start();
+
+			Thread.sleep(1000);
+			cancel();
+
+			try {
+				taskRunner.join();
+			}
+			catch (InterruptedException ie) {
+				Assert.fail("Joining threads failed");
+			}
+
+			Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testHashFirstCancelMatchTaskWhileMatching() {
+		int keyCnt = 20;
+		int valCnt = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(new NirvanaOutputList());
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		final AtomicBoolean success = new AtomicBoolean(false);
+		
+		Thread taskRunner = new Thread() {
+			@Override
+			public void run() {
+				try {
+					testDriver(testTask, MockMatchStub.class);
+					success.set(true);
+				} catch (Exception ie) {
+					ie.printStackTrace();
+				}
+			}
+		};
+		taskRunner.start();
+		
+		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
+		tct.start();
+		
+		try {
+			tct.join();
+			taskRunner.join();
+		} catch(InterruptedException ie) {
+			Assert.fail("Joining threads failed");
+		}
+		
+		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+	}
+	
+	@Test
+	public void testHashSecondCancelMatchTaskWhileMatching() {
+		int keyCnt = 20;
+		int valCnt = 20;
+		
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+		addDriverComparator(this.comparator1);
+		addDriverComparator(this.comparator2);
+		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+		setOutput(new NirvanaOutputList());
+		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+		getTaskConfig().setRelativeMemoryDriver(hash_frac);
+		
+		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+		
+		final AtomicBoolean success = new AtomicBoolean(false);
+		
+		Thread taskRunner = new Thread() {
+			@Override
+			public void run() {
+				try {
+					testDriver(testTask, MockMatchStub.class);
+					success.set(true);
+				} catch (Exception ie) {
+					ie.printStackTrace();
+				}
+			}
+		};
+		taskRunner.start();
+		
+		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
+		tct.start();
+		
+		try {
+			tct.join();
+			taskRunner.join();
+		} catch(InterruptedException ie) {
+			Assert.fail("Joining threads failed");
+		}
+		
+		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+	}
+	
+	// =================================================================================================
+	
+	public static final class MockMatchStub implements FlatJoinFunction<Record, Record, Record> {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
+			out.collect(record1);
+		}
+	}
+	
+	public static final class MockFailingMatchStub implements FlatJoinFunction<Record, Record, Record> {
+		private static final long serialVersionUID = 1L;
+		
+		private int cnt = 0;
+		
+		@Override
+		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
+			if (++this.cnt >= 10) {
+				throw new ExpectedTestException();
+			}
+			out.collect(record1);
+		}
+	}
+	
+	public static final class MockDelayingMatchStub implements FlatJoinFunction<Record, Record, Record> {
+		private static final long serialVersionUID = 1L;
+		
+		@Override
+		public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
+			try {
+				Thread.sleep(100);
+			} catch (InterruptedException e) {
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
deleted file mode 100644
index bfc6c44..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
-import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class MapTaskTest extends DriverTestBase<GenericCollectorMap<Record, Record>> {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(MapTaskTest.class);
-	
-	private final CountingOutputCollector output = new CountingOutputCollector();
-	
-	
-	public MapTaskTest(ExecutionConfig config) {
-		super(config, 0, 0);
-	}
-	
-	@Test
-	public void testMapTask() {
-		final int keyCnt = 100;
-		final int valCnt = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		setOutput(this.output);
-		
-		final CollectorMapDriver<Record, Record> testDriver = new CollectorMapDriver<Record, Record>();
-		
-		try {
-			testDriver(testDriver, MockMapStub.class);
-		} catch (Exception e) {
-			LOG.debug("Exception while running the test driver.", e);
-			Assert.fail("Invoke method caused exception.");
-		}
-		
-		Assert.assertEquals("Wrong result set size.", keyCnt*valCnt, this.output.getNumberOfRecords());
-	}
-	
-	@Test
-	public void testFailingMapTask() {
-		final int keyCnt = 100;
-		final int valCnt = 20;
-		
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-		setOutput(new DiscardingOutputCollector<Record>());
-		
-		final CollectorMapDriver<Record, Record> testTask = new CollectorMapDriver<Record, Record>();
-		try {
-			testDriver(testTask, MockFailingMapStub.class);
-			Assert.fail("Function exception was not forwarded.");
-		} catch (ExpectedTestException e) {
-			// good!
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Exception in test.");
-		}
-	}
-	
-	@Test
-	public void testCancelMapTask() {
-		addInput(new InfiniteInputIterator());
-		setOutput(new DiscardingOutputCollector<Record>());
-		
-		final CollectorMapDriver<Record, Record> testTask = new CollectorMapDriver<Record, Record>();
-		
-		final AtomicBoolean success = new AtomicBoolean(false);
-		
-		final Thread taskRunner = new Thread() {
-			@Override
-			public void run() {
-				try {
-					testDriver(testTask, MockMapStub.class);
-					success.set(true);
-				} catch (Exception ie) {
-					ie.printStackTrace();
-				}
-			}
-		};
-		taskRunner.start();
-		
-		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
-		tct.start();
-		
-		try {
-			tct.join();
-			taskRunner.join();		
-		} catch(InterruptedException ie) {
-			Assert.fail("Joining threads failed");
-		}
-		
-		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
-	}
-	
-	public static class MockMapStub extends MapFunction {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			out.collect(record);
-		}
-		
-	}
-	
-	public static class MockFailingMapStub extends MapFunction {
-		private static final long serialVersionUID = 1L;
-		
-		private int cnt = 0;
-		
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			if (++this.cnt >= 10) {
-				throw new ExpectedTestException();
-			}
-			out.collect(record);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/3c8a6588/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
deleted file mode 100644
index 6f7fb21..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.junit.Assert;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
-	
-	private static final long HASH_MEM = 4*1024*1024;
-	
-	private static final long SORT_MEM = 3*1024*1024;
-	
-	private static final long BNLJN_MEM = 10 * PAGE_SIZE;
-
-	private final double bnljn_frac;
-
-	private final double hash_frac;
-
-	@SuppressWarnings("unchecked")
-	private final RecordComparator comparator1 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
-	
-	@SuppressWarnings("unchecked")
-	private final RecordComparator comparator2 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
-	
-	private final CountingOutputCollector output = new CountingOutputCollector();
-	
-	public MatchTaskExternalITCase(ExecutionConfig config) {
-		super(config, HASH_MEM, 2, SORT_MEM);
-		bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
-		hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
-	}
-	
-	@Test
-	public void testExternalSort1MatchTask() {
-		final int keyCnt1 = 16384*4;
-		final int valCnt1 = 2;
-		
-		final int keyCnt2 = 8192;
-		final int valCnt2 = 4*2;
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		setOutput(this.output);
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
-			addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
-	}
-	
-	@Test
-	public void testExternalHash1MatchTask() {
-		final int keyCnt1 = 32768;
-		final int valCnt1 = 8;
-		
-		final int keyCnt2 = 65536;
-		final int valCnt2 = 8;
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.output);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
-	}
-	
-	@Test
-	public void testExternalHash2MatchTask() {
-		final int keyCnt1 = 32768;
-		final int valCnt1 = 8;
-		
-		final int keyCnt2 = 65536;
-		final int valCnt2 = 8;
-		
-		final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-		
-		addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
-		addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		setOutput(this.output);
-		getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-		getTaskConfig().setRelativeMemoryDriver(hash_frac);
-		
-		JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		try {
-			testDriver(testTask, MockMatchStub.class);
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test caused an exception.");
-		}
-		
-		Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
-	}
-	
-	public static final class MockMatchStub extends JoinFunction {
-		private static final long serialVersionUID = 1L;
-		
-		@Override
-		public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
-			out.collect(value1);
-		}
-	}
-}


[5/5] flink git commit: [FLINK-2895] Duplicate immutable object creation

Posted by fh...@apache.org.
[FLINK-2895] Duplicate immutable object creation

Operators defer object creation when object reuse is disabled.

This closes #1288


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bbb75c59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bbb75c59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bbb75c59

Branch: refs/heads/master
Commit: bbb75c599aba1fffa3f52b45af77ee9c7ece3ca0
Parents: d2e4a27
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Oct 22 09:31:09 2015 -0400
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 23 13:06:29 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/runtime/operators/AllReduceDriver.java  | 8 +++-----
 .../java/org/apache/flink/runtime/operators/NoOpDriver.java  | 4 +---
 .../org/apache/flink/runtime/operators/ReduceDriver.java     | 4 ++--
 3 files changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bbb75c59/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index 06f22c5..1d35fdb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -108,7 +108,6 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 		final MutableObjectIterator<T> input = this.input;
 		final TypeSerializer<T> serializer = this.serializer;
 
-
 		if (objectReuseEnabled) {
 			T val1 = serializer.createInstance();
 
@@ -123,14 +122,13 @@ public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 
 			this.taskContext.getOutputCollector().collect(val1);
 		} else {
-			T val1 = serializer.createInstance();
-
-			if ((val1 = input.next(val1)) == null) {
+			T val1;
+			if ((val1 = input.next()) == null) {
 				return;
 			}
 
 			T val2;
-			while (running && (val2 = input.next(serializer.createInstance())) != null) {
+			while (running && (val2 = input.next()) != null) {
 				val1 = stub.reduce(val1, val2);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb75c59/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
index fcd2716..428cfe4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/NoOpDriver.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 import org.slf4j.Logger;
@@ -87,8 +86,7 @@ public class NoOpDriver<T> implements Driver<AbstractRichFunction, T> {
 			}
 		} else {
 			T record;
-			TypeSerializer<T> serializer = this.taskContext.<T>getInputSerializer(0).getSerializer();
-			while (this.running && ((record = input.next(serializer.createInstance())) != null)) {
+			while (this.running && ((record = input.next()) != null)) {
 				output.collect(record);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bbb75c59/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
index 970441e..6a7c42c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/ReduceDriver.java
@@ -148,7 +148,7 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 				}
 			}
 		} else {
-			T value = input.next(serializer.createInstance());
+			T value = input.next();
 
 			// iterate over key groups
 			while (this.running && value != null) {
@@ -156,7 +156,7 @@ public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
 				T res = value;
 
 				// iterate within a key group
-				while ((value = input.next(serializer.createInstance())) != null) {
+				while ((value = input.next()) != null) {
 					if (comparator.equalToReference(value)) {
 						// same group, reduce
 						res = function.reduce(res, value);