You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:17 UTC

[03/19] flink git commit: [FLINK-5497] [tests] Remove duplicated tests for hash tables

[FLINK-5497] [tests] Remove duplicated tests for hash tables

This closes #3089


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53134594
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53134594
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53134594

Branch: refs/heads/master
Commit: 53134594644407d0a3cd691b0e93ae09ff6c8102
Parents: 9f544d8
Author: Alexey Diomin <di...@gmail.com>
Authored: Tue Jan 10 22:04:41 2017 +0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Feb 18 19:19:34 2017 +0100

----------------------------------------------------------------------
 .../NonReusingReOpenableHashTableITCase.java    | 421 +-----------------
 .../hash/ReOpenableHashTableITCase.java         | 222 ++++++++++
 .../hash/ReOpenableHashTableTestBase.java       | 193 +++++++++
 .../hash/ReusingReOpenableHashTableITCase.java  | 429 +------------------
 4 files changed, 429 insertions(+), 836 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/53134594/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index 576cbd4..6b4e170 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -19,190 +19,34 @@
 
 package org.apache.flink.runtime.operators.hash;
 
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
 import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
 import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
 import org.apache.flink.runtime.operators.testutils.TestData;
 import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.UnionIterator;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
 
-import static org.junit.Assert.fail;
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples;
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectTupleData;
 
 /**
  * Test specialized hash join that keeps the build side data (in memory and on hard disk)
  * This is used for iterative tasks.
  */
-public class NonReusingReOpenableHashTableITCase {
+public class NonReusingReOpenableHashTableITCase extends ReOpenableHashTableTestBase {
 
-	private static final int PAGE_SIZE = 8 * 1024;
-	private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
-
-	private static final long SEED1 = 561349061987311L;
-	private static final long SEED2 = 231434613412342L;
-
-	private static final int NUM_PROBES = 3; // number of reopenings of hash join
-
-	private final AbstractInvokable parentTask = new DummyInvokable();
-
-	private IOManager ioManager;
-	private MemoryManager memoryManager;
-
-	private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
-	private TypeComparator<Tuple2<Integer, String>> record1Comparator;
-	private TypeComparator<Tuple2<Integer, String>> record2Comparator;
-	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
-
-
-
-
-	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
-	private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
-	private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
-	private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
-	private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
-	private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
-
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	@Before
-	public void beforeTest() {
-		this.recordSerializer = TestData.getIntStringTupleSerializer();
-
-		this.record1Comparator = TestData.getIntStringTupleComparator();
-		this.record2Comparator = TestData.getIntStringTupleComparator();
-		this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
-
-		this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
-		this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
-		this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
-		this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
-		this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
-
-		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
-		this.ioManager = new IOManagerAsync();
-	}
-
-	@After
-	public void afterTest() {
-		if (this.ioManager != null) {
-			this.ioManager.shutdown();
-			if (!this.ioManager.isProperlyShutDown()) {
-				Assert.fail("I/O manager failed to properly shut down.");
-			}
-			this.ioManager = null;
-		}
-
-		if (this.memoryManager != null) {
-			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
-				this.memoryManager.verifyEmpty());
-			this.memoryManager.shutdown();
-			this.memoryManager = null;
-		}
-	}
-
-
-	/**
-	 * Test behavior with overflow buckets (Overflow buckets must be initialized correctly
-	 * if the input is reopened again)
-	 */
-	@Test
-	public void testOverflow() {
-
-		int buildSize = 1000;
-		int probeSize = 1000;
-		try {
-			TupleGenerator bgen = new TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-			TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-
-			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
-			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
-			doTest(buildInput,probeInput, bgen, pgen);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-
-	/**
-	 * Verify proper operation if the build side is spilled to disk.
-	 */
-	@Test
-	public void testDoubleProbeSpilling() {
-
-		int buildSize = 1000;
-		int probeSize = 1000;
-		try {
-			TupleGenerator bgen = new TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-
-			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
-			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
-			doTest(buildInput,probeInput, bgen, pgen);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-
-	/**
-	 * This test case verifies that hybrid hash join is able to handle multiple probe phases
-	 * when the build side fits completely into memory.
-	 */
-	@Test
-	public void testDoubleProbeInMemory() {
-
-		int buildSize = 1000;
-		int probeSize = 1000;
-		try {
-			TupleGenerator bgen = new TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			TupleGenerator pgen = new TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-
-			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
-			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
-
-			doTest(buildInput,probeInput, bgen, pgen);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-
-	private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator pgen) throws Exception {
+	protected void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator pgen) throws Exception {
 		// collect expected data
-		final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = NonReusingHashJoinIteratorITCase.joinTuples(NonReusingHashJoinIteratorITCase.collectTupleData(buildInput), NonReusingHashJoinIteratorITCase.collectTupleData(probeInput));
+		final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = joinTuples(collectTupleData(buildInput), collectTupleData(probeInput));
 
 		final List<Map<Integer, Collection<TupleMatch>>> expectedNMatchesMapList = new ArrayList<>(NUM_PROBES);
 		final FlatJoinFunction[] nMatcher = new TupleMatchRemovingJoin[NUM_PROBES];
@@ -259,260 +103,5 @@ public class NonReusingReOpenableHashTableITCase {
 		iterator.close();
 	}
 
-	//
-	//
-	//	Tests taken from HahTableITCase!
-	//
-	//
-
-	private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(final int numKeys,
-			final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
-		MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
-		MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
-		MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
-		List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = new ArrayList<>();
-		probes.add(probe1);
-		probes.add(probe2);
-		probes.add(probe3);
-		return new UnionIterator<>(probes);
-	}
-
-	@Test
-	public void testSpillingHashJoinWithMassiveCollisions() throws IOException
-	{
-		// the following two values are known to have a hash-code collision on the initial level.
-		// we use them to make sure one partition grows over-proportionally large
-		final int REPEATED_VALUE_1 = 40559;
-		final int REPEATED_VALUE_2 = 92882;
-		final int REPEATED_VALUE_COUNT_BUILD = 200000;
-		final int REPEATED_VALUE_COUNT_PROBE = 5;
-
-		final int NUM_KEYS = 1000000;
-		final int BUILD_VALS_PER_KEY = 3;
-		final int PROBE_VALS_PER_KEY = 10;
-
-		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
-		builds.add(build1);
-		builds.add(build2);
-		builds.add(build3);
-		MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
-
-
-
-
-		// allocate the memory for the HashTable
-		List<MemorySegment> memSegments;
-		try {
-			memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
-		}
-		catch (MemoryAllocationException maex) {
-			fail("Memory for the Join could not be provided.");
-			return;
-		}
-
-		// create the map for validating the results
-		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
-
-		// ----------------------------------------------------------------------------------------
-
-		final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
-				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
-				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager, true);
-
-		for (int probe = 0; probe < NUM_PROBES; probe++) {
-			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
-			if(probe == 0) {
-				join.open(buildInput, probeInput);
-			} else {
-				join.reopenProbe(probeInput);
-			}
-
-			Tuple2<Integer, Integer> record;
-			final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
-
-			while (join.nextRecord()) {
-				long numBuildValues = 0;
-
-				final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
-				Integer key = probeRec.f0;
-				
-				MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
-				if ((record = buildSide.next(recordReuse)) != null) {
-					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
-				}
-				else {
-					fail("No build side values found for a probe key.");
-				}
-				while ((record = buildSide.next(record)) != null) {
-					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
-				}
-
-				Long contained = map.get(key);
-				if (contained == null) {
-					contained = numBuildValues;
-				}
-				else {
-					contained = contained + numBuildValues;
-				}
-
-				map.put(key, contained);
-			}
-		}
-
-		join.close();
-
-		Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
-		for (Entry<Integer, Long> entry : map.entrySet()) {
-			long val = entry.getValue();
-			int key = entry.getKey();
-
-			if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
-				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
-							(PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
-			} else {
-				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
-							PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
-			}
-		}
-
-
-		// ----------------------------------------------------------------------------------------
-
-		this.memoryManager.release(join.getFreedMemory());
-	}
-
-	/*
-	 * This test is basically identical to the "testSpillingHashJoinWithMassiveCollisions" test, only that the number
-	 * of repeated values (causing bucket collisions) are large enough to make sure that their target partition no longer
-	 * fits into memory by itself and needs to be repartitioned in the recursion again.
-	 */
-	@Test
-	public void testSpillingHashJoinWithTwoRecursions() throws IOException
-	{
-		// the following two values are known to have a hash-code collision on the first recursion level.
-		// we use them to make sure one partition grows over-proportionally large
-		final int REPEATED_VALUE_1 = 40559;
-		final int REPEATED_VALUE_2 = 92882;
-		final int REPEATED_VALUE_COUNT_BUILD = 200000;
-		final int REPEATED_VALUE_COUNT_PROBE = 5;
-
-		final int NUM_KEYS = 1000000;
-		final int BUILD_VALS_PER_KEY = 3;
-		final int PROBE_VALS_PER_KEY = 10;
-
-		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
-		builds.add(build1);
-		builds.add(build2);
-		builds.add(build3);
-		MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
-
-
-		// allocate the memory for the HashTable
-		List<MemorySegment> memSegments;
-		try {
-			memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
-		}
-		catch (MemoryAllocationException maex) {
-			fail("Memory for the Join could not be provided.");
-			return;
-		}
-
-		// create the map for validating the results
-		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
-
-		// ----------------------------------------------------------------------------------------
-
-		final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
-				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
-				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager, true);
-		
-		for (int probe = 0; probe < NUM_PROBES; probe++) {
-			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
-			if (probe == 0) {
-				join.open(buildInput, probeInput);
-			} else {
-				join.reopenProbe(probeInput);
-			}
-			Tuple2<Integer, Integer> record;
-			final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
-
-			while (join.nextRecord()) {
-				long numBuildValues = 0;
-
-				final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
-				Integer key = probeRec.f0;
-				
-				MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
-				if ((record = buildSide.next(recordReuse)) != null) {
-					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
-				}
-				else {
-					fail("No build side values found for a probe key.");
-				}
-				while ((record = buildSide.next(recordReuse)) != null) {
-					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
-				}
-
-				Long contained = map.get(key);
-				if (contained == null) {
-					contained = numBuildValues;
-				}
-				else {
-					contained = contained + numBuildValues;
-				}
-
-				map.put(key, contained);
-			}
-		}
-
-		join.close();
-		Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
-		for (Entry<Integer, Long> entry : map.entrySet()) {
-			long val = entry.getValue();
-			int key = entry.getKey();
-
-			if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
-				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
-							(PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
-			} else {
-				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
-							PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
-			}
-		}
-
-
-		// ----------------------------------------------------------------------------------------
-
-		this.memoryManager.release(join.getFreedMemory());
-	}
-
 
-	static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, Collection<TupleMatch>> expectedSecondMatchesMap) {
-		Map<Integer, Collection<TupleMatch>> copy = new HashMap<>(expectedSecondMatchesMap.size());
-		for(Entry<Integer, Collection<TupleMatch>> entry : expectedSecondMatchesMap.entrySet()) {
-			List<TupleMatch> matches = new ArrayList<TupleMatch>(entry.getValue().size());
-			for(TupleMatch m : entry.getValue()) {
-				matches.add(m);
-			}
-			copy.put(entry.getKey(), matches);
-		}
-		return copy;
-	}
-	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/53134594/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
new file mode 100644
index 0000000..f667c87
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+
+public class ReOpenableHashTableITCase {
+
+	private static final int PAGE_SIZE = 8 * 1024;
+	private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
+
+	private static final int NUM_PROBES = 3; // number of reopenings of hash join
+
+	private IOManager ioManager;
+	private MemoryManager memoryManager;
+
+	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
+	private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
+	private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
+	private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
+	private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
+	private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
+
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	@Before
+	public void beforeTest() {
+		this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
+		this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
+		this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
+		this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
+		this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
+
+		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
+		this.ioManager = new IOManagerAsync();
+	}
+
+	@After
+	public void afterTest() {
+		if (this.ioManager != null) {
+			this.ioManager.shutdown();
+			if (!this.ioManager.isProperlyShutDown()) {
+				Assert.fail("I/O manager failed to properly shut down.");
+			}
+			this.ioManager = null;
+		}
+
+		if (this.memoryManager != null) {
+			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
+				this.memoryManager.verifyEmpty());
+			this.memoryManager.shutdown();
+			this.memoryManager = null;
+		}
+	}
+
+	private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(final int numKeys,
+																		  final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
+		MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = new ArrayList<>();
+		probes.add(probe1);
+		probes.add(probe2);
+		probes.add(probe3);
+		return new UnionIterator<>(probes);
+	}
+
+	@Test
+	public void testSpillingHashJoinWithMassiveCollisions() throws IOException
+	{
+		// the following two values are known to have a hash-code collision on the initial level.
+		// we use them to make sure one partition grows over-proportionally large
+		final int REPEATED_VALUE_1 = 40559;
+		final int REPEATED_VALUE_2 = 92882;
+		final int REPEATED_VALUE_COUNT_BUILD = 200000;
+		final int REPEATED_VALUE_COUNT_PROBE = 5;
+
+		final int NUM_KEYS = 1000000;
+		final int BUILD_VALS_PER_KEY = 3;
+		final int PROBE_VALS_PER_KEY = 10;
+
+		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
+		MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+		MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+		List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
+		builds.add(build1);
+		builds.add(build2);
+		builds.add(build3);
+		MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
+
+
+
+
+		// allocate the memory for the HashTable
+		List<MemorySegment> memSegments;
+		try {
+			memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
+		}
+		catch (MemoryAllocationException maex) {
+			fail("Memory for the Join could not be provided.");
+			return;
+		}
+
+		// create the map for validating the results
+		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
+
+		// ----------------------------------------------------------------------------------------
+
+		final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
+			this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
+			this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
+			memSegments, ioManager, true);
+
+		for (int probe = 0; probe < NUM_PROBES; probe++) {
+			// create a probe input that gives 10 million pairs with 10 values sharing a key
+			MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+			if(probe == 0) {
+				join.open(buildInput, probeInput);
+			} else {
+				join.reopenProbe(probeInput);
+			}
+
+			Tuple2<Integer, Integer> record;
+			final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
+
+			while (join.nextRecord()) {
+				long numBuildValues = 0;
+
+				final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+				Integer key = probeRec.f0;
+
+				MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
+				if ((record = buildSide.next(recordReuse)) != null) {
+					numBuildValues = 1;
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
+				}
+				else {
+					fail("No build side values found for a probe key.");
+				}
+				while ((record = buildSide.next(recordReuse)) != null) {
+					numBuildValues++;
+					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
+				}
+
+				Long contained = map.get(key);
+				if (contained == null) {
+					contained = numBuildValues;
+				}
+				else {
+					contained = contained + numBuildValues;
+				}
+
+				map.put(key, contained);
+			}
+		}
+
+		join.close();
+		Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
+		for (Map.Entry<Integer, Long> entry : map.entrySet()) {
+			long val = entry.getValue();
+			int key = entry.getKey();
+
+			if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
+				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
+					(PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
+			} else {
+				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
+					PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
+			}
+		}
+
+
+		// ----------------------------------------------------------------------------------------
+
+		this.memoryManager.release(join.getFreedMemory());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53134594/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
new file mode 100644
index 0000000..c1b87b0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGeneratorIterator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import java.util.*;
+
+public abstract class ReOpenableHashTableTestBase {
+
+	protected static final int PAGE_SIZE = 8 * 1024;
+	protected static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
+
+	protected static final long SEED1 = 561349061987311L;
+	protected static final long SEED2 = 231434613412342L;
+
+	protected static final int NUM_PROBES = 3; // number of reopenings of hash join
+
+	protected final AbstractInvokable parentTask = new DummyInvokable();
+
+	protected IOManager ioManager;
+	protected MemoryManager memoryManager;
+
+	protected TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+	protected TypeComparator<Tuple2<Integer, String>> record1Comparator;
+	protected TypeComparator<Tuple2<Integer, String>> record2Comparator;
+	protected TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
+
+	protected TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
+	protected TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
+	protected TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
+	protected TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
+	protected TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
+
+	@SuppressWarnings({"unchecked", "rawtypes"})
+	@Before
+	public void beforeTest() {
+		this.recordSerializer = TestData.getIntStringTupleSerializer();
+
+		this.record1Comparator = TestData.getIntStringTupleComparator();
+		this.record2Comparator = TestData.getIntStringTupleComparator();
+		this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
+
+		this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
+		this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
+		this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
+		this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
+		this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
+
+		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
+		this.ioManager = new IOManagerAsync();
+	}
+
+	@After
+	public void afterTest() {
+		if (this.ioManager != null) {
+			this.ioManager.shutdown();
+			if (!this.ioManager.isProperlyShutDown()) {
+				Assert.fail("I/O manager failed to properly shut down.");
+			}
+			this.ioManager = null;
+		}
+
+		if (this.memoryManager != null) {
+			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
+				this.memoryManager.verifyEmpty());
+			this.memoryManager.shutdown();
+			this.memoryManager = null;
+		}
+	}
+
+	/**
+	 * Test behavior with overflow buckets (Overflow buckets must be initialized correctly
+	 * if the input is reopened again)
+	 */
+	@Test
+	public void testOverflow() {
+
+		int buildSize = 1000;
+		int probeSize = 1000;
+		try {
+			TupleGenerator bgen = new TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+			TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+			final TupleGeneratorIterator buildInput = new TupleGeneratorIterator(bgen, buildSize);
+			final TupleGeneratorIterator probeInput = new TupleGeneratorIterator(pgen, probeSize);
+			doTest(buildInput,probeInput, bgen, pgen);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+
+	/**
+	 * Verify proper operation if the build side is spilled to disk.
+	 */
+	@Test
+	public void testDoubleProbeSpilling() {
+
+		int buildSize = 1000;
+		int probeSize = 1000;
+		try {
+			TupleGenerator bgen = new TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+			final TupleGeneratorIterator buildInput = new TupleGeneratorIterator(bgen, buildSize);
+			final TupleGeneratorIterator probeInput = new TupleGeneratorIterator(pgen, probeSize);
+			doTest(buildInput,probeInput, bgen, pgen);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+
+	/**
+	 * This test case verifies that hybrid hash join is able to handle multiple probe phases
+	 * when the build side fits completely into memory.
+	 */
+	@Test
+	public void testDoubleProbeInMemory() {
+
+		int buildSize = 1000;
+		int probeSize = 1000;
+		try {
+			TupleGenerator bgen = new TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+			TupleGenerator pgen = new TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+			final TupleGeneratorIterator buildInput = new TupleGeneratorIterator(bgen, buildSize);
+			final TupleGeneratorIterator probeInput = new TupleGeneratorIterator(pgen, probeSize);
+
+			doTest(buildInput,probeInput, bgen, pgen);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			Assert.fail("An exception occurred during the test: " + e.getMessage());
+		}
+	}
+
+	abstract protected void doTest(TupleGeneratorIterator buildInput, TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, TestData.TupleGenerator pgen) throws Exception;
+
+	static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, Collection<TupleMatch>> expectedSecondMatchesMap) {
+		Map<Integer, Collection<TupleMatch>> copy = new HashMap<>(expectedSecondMatchesMap.size());
+		for(Map.Entry<Integer, Collection<TupleMatch>> entry : expectedSecondMatchesMap.entrySet()) {
+			List<TupleMatch> matches = new ArrayList<TupleMatch>(entry.getValue().size());
+			for(TupleMatch m : entry.getValue()) {
+				matches.add(m);
+			}
+			copy.put(entry.getKey(), matches);
+		}
+		return copy;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/53134594/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index 6afde16..af3a894 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -19,44 +19,21 @@
 
 package org.apache.flink.runtime.operators.hash;
 
-import static org.junit.Assert.fail;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
-import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
 
 import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples;
 import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectTupleData;
@@ -65,144 +42,9 @@ import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIterator
  * Test specialized hash join that keeps the build side data (in memory and on hard disk)
  * This is used for iterative tasks.
  */
-public class ReusingReOpenableHashTableITCase {
-	
-	private static final int PAGE_SIZE = 8 * 1024;
-	private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
+public class ReusingReOpenableHashTableITCase extends ReOpenableHashTableTestBase {
 
-	private static final long SEED1 = 561349061987311L;
-	private static final long SEED2 = 231434613412342L;
-	
-	private static final int NUM_PROBES = 3; // number of reopenings of hash join
-	
-	private final AbstractInvokable parentTask = new DummyInvokable();
-
-	private IOManager ioManager;
-	private MemoryManager memoryManager;
-	
-	private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
-	private TypeComparator<Tuple2<Integer, String>> record1Comparator;
-	private TypeComparator<Tuple2<Integer, String>> record2Comparator;
-	private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
-	
-	
-	
-	
-	private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
-	private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
-	private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
-	private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
-	private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
-	private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
-
-	@SuppressWarnings({"unchecked", "rawtypes"})
-	@Before
-	public void beforeTest() {
-		this.recordSerializer = TestData.getIntStringTupleSerializer();
-		
-		this.record1Comparator = TestData.getIntStringTupleComparator();
-		this.record2Comparator = TestData.getIntStringTupleComparator();
-		this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
-		
-		this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
-		this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
-		this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
-		this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
-		this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
-		
-		this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
-		this.ioManager = new IOManagerAsync();
-	}
-
-	@After
-	public void afterTest() {
-		if (this.ioManager != null) {
-			this.ioManager.shutdown();
-			if (!this.ioManager.isProperlyShutDown()) {
-				Assert.fail("I/O manager failed to properly shut down.");
-			}
-			this.ioManager = null;
-		}
-		
-		if (this.memoryManager != null) {
-			Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
-				this.memoryManager.verifyEmpty());
-			this.memoryManager.shutdown();
-			this.memoryManager = null;
-		}
-	}
-	
-	
-	/**
-	 * Test behavior with overflow buckets (Overflow buckets must be initialized correctly 
-	 * if the input is reopened again)
-	 */
-	@Test
-	public void testOverflow() {
-		
-		int buildSize = 1000;
-		int probeSize = 1000;
-		try {
-			TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
-			TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			
-			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
-			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
-			doTest(buildInput,probeInput, bgen, pgen);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-	
-	/**
-	 * Verify proper operation if the build side is spilled to disk.
-	 */
-	@Test
-	public void testDoubleProbeSpilling() {
-		
-		int buildSize = 1000;
-		int probeSize = 1000;
-		try {
-			TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			
-			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
-			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
-			doTest(buildInput,probeInput, bgen, pgen);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-	
-	/**
-	 * This test case verifies that hybrid hash join is able to handle multiple probe phases
-	 * when the build side fits completely into memory.
-	 */
-	@Test
-	public void testDoubleProbeInMemory() {
-		
-		int buildSize = 1000;
-		int probeSize = 1000;
-		try {
-			TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-			
-			final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
-			final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
-			
-			doTest(buildInput,probeInput, bgen, pgen);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("An exception occurred during the test: " + e.getMessage());
-		}
-	}
-	
-	private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, TestData.TupleGenerator pgen) throws Exception {
+	protected void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator pgen) throws Exception {
 		// collect expected data
 		final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = joinTuples(collectTupleData(buildInput), collectTupleData(probeInput));
 		
@@ -260,257 +102,4 @@ public class ReusingReOpenableHashTableITCase {
 		
 		iterator.close();
 	}
-	
-	//
-	//
-	//	Tests taken from HahTableITCase!
-	//
-	//
-	
-	private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(final int numKeys,
-			final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
-		MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
-		MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
-		MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
-		List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = new ArrayList<>();
-		probes.add(probe1);
-		probes.add(probe2);
-		probes.add(probe3);
-		return new UnionIterator<>(probes);
-	}
-	
-	@Test
-	public void testSpillingHashJoinWithMassiveCollisions() throws IOException {
-		// the following two values are known to have a hash-code collision on the initial level.
-		// we use them to make sure one partition grows over-proportionally large
-		final int REPEATED_VALUE_1 = 40559;
-		final int REPEATED_VALUE_2 = 92882;
-		final int REPEATED_VALUE_COUNT_BUILD = 200000;
-		final int REPEATED_VALUE_COUNT_PROBE = 5;
-		
-		final int NUM_KEYS = 1000000;
-		final int BUILD_VALS_PER_KEY = 3;
-		final int PROBE_VALS_PER_KEY = 10;
-		
-		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
-		builds.add(build1);
-		builds.add(build2);
-		builds.add(build3);
-		MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
-
-		// allocate the memory for the HashTable
-		List<MemorySegment> memSegments;
-		try {
-			memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
-		}
-		catch (MemoryAllocationException maex) {
-			fail("Memory for the Join could not be provided.");
-			return;
-		}
-		
-		// create the map for validating the results
-		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
-		
-		// ----------------------------------------------------------------------------------------
-		
-		final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
-				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
-				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager, true);
-		
-		for(int probe = 0; probe < NUM_PROBES; probe++) {
-			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
-			if(probe == 0) {
-				join.open(buildInput, probeInput);
-			} else {
-				join.reopenProbe(probeInput);
-			}
-		
-			Tuple2<Integer, Integer> record;
-			final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
-
-			while (join.nextRecord()) {
-				long numBuildValues = 0;
-		
-				final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
-				Integer key = probeRec.f0;
-				
-				MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
-				if ((record = buildSide.next(recordReuse)) != null) {
-					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); 
-				}
-				else {
-					fail("No build side values found for a probe key.");
-				}
-				while ((record = buildSide.next(record)) != null) {
-					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
-				}
-				
-				Long contained = map.get(key);
-				if (contained == null) {
-					contained = numBuildValues;
-				}
-				else {
-					contained = contained + numBuildValues;
-				}
-				
-				map.put(key, contained);
-			}
-		}
-		
-		join.close();
-		
-		Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
-		for (Map.Entry<Integer, Long> entry : map.entrySet()) {
-			long val = entry.getValue();
-			int key = entry.getKey();
-	
-			if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
-				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, 
-							(PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
-			} else {
-				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, 
-							PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
-			}
-		}
-		
-		
-		// ----------------------------------------------------------------------------------------
-		
-		this.memoryManager.release(join.getFreedMemory());
-	}
-	
-	/*
-	 * This test is basically identical to the "testSpillingHashJoinWithMassiveCollisions" test, only that the number
-	 * of repeated values (causing bucket collisions) are large enough to make sure that their target partition no longer
-	 * fits into memory by itself and needs to be repartitioned in the recursion again.
-	 */
-	@Test
-	public void testSpillingHashJoinWithTwoRecursions() throws IOException
-	{
-		// the following two values are known to have a hash-code collision on the first recursion level.
-		// we use them to make sure one partition grows over-proportionally large
-		final int REPEATED_VALUE_1 = 40559;
-		final int REPEATED_VALUE_2 = 92882;
-		final int REPEATED_VALUE_COUNT_BUILD = 200000;
-		final int REPEATED_VALUE_COUNT_PROBE = 5;
-		
-		final int NUM_KEYS = 1000000;
-		final int BUILD_VALS_PER_KEY = 3;
-		final int PROBE_VALS_PER_KEY = 10;
-		
-		// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
-		MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
-		MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
-		MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
-		List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
-		builds.add(build1);
-		builds.add(build2);
-		builds.add(build3);
-		MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
-	
-
-		// allocate the memory for the HashTable
-		List<MemorySegment> memSegments;
-		try {
-			memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
-		}
-		catch (MemoryAllocationException maex) {
-			fail("Memory for the Join could not be provided.");
-			return;
-		}
-		
-		// create the map for validating the results
-		HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
-		
-		// ----------------------------------------------------------------------------------------
-		
-		final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
-				this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, 
-				this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
-				memSegments, ioManager, true);
-		
-		for (int probe = 0; probe < NUM_PROBES; probe++) {
-			// create a probe input that gives 10 million pairs with 10 values sharing a key
-			MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
-			if(probe == 0) {
-				join.open(buildInput, probeInput);
-			} else {
-				join.reopenProbe(probeInput);
-			}
-			Tuple2<Integer, Integer> record;
-			final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
-
-			while (join.nextRecord())
-			{	
-				long numBuildValues = 0;
-				
-				final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
-				Integer key = probeRec.f0;
-				
-				MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
-				if ((record = buildSide.next(recordReuse)) != null) {
-					numBuildValues = 1;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); 
-				}
-				else {
-					fail("No build side values found for a probe key.");
-				}
-				while ((record = buildSide.next(recordReuse)) != null) {
-					numBuildValues++;
-					Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
-				}
-				
-				Long contained = map.get(key);
-				if (contained == null) {
-					contained = numBuildValues;
-				}
-				else {
-					contained = contained + numBuildValues;
-				}
-				
-				map.put(key, contained);
-			}
-		}
-		
-		join.close();
-		Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
-		for (Map.Entry<Integer, Long> entry : map.entrySet()) {
-			long val = entry.getValue();
-			int key = entry.getKey();
-	
-			if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
-				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, 
-							(PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
-			} else {
-				Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, 
-							PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
-			}
-		}
-		
-		
-		// ----------------------------------------------------------------------------------------
-		
-		this.memoryManager.release(join.getFreedMemory());
-	}
-	
-	
-	static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, Collection<TupleMatch>> expectedSecondMatchesMap) {
-		Map<Integer, Collection<TupleMatch>> copy = new HashMap<>(expectedSecondMatchesMap.size());
-		for(Map.Entry<Integer, Collection<TupleMatch>> entry : expectedSecondMatchesMap.entrySet()) {
-			List<TupleMatch> matches = new ArrayList<TupleMatch>(entry.getValue().size());
-			for(TupleMatch m : entry.getValue()) {
-				matches.add(m);
-			}
-			copy.put(entry.getKey(), matches);
-		}
-		return copy;
-	}
 }