You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:17 UTC
[03/19] flink git commit: [FLINK-5497] [tests] Remove duplicated
tests for hash tables
[FLINK-5497] [tests] Remove duplicated tests for hash tables
This closes #3089
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/53134594
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/53134594
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/53134594
Branch: refs/heads/master
Commit: 53134594644407d0a3cd691b0e93ae09ff6c8102
Parents: 9f544d8
Author: Alexey Diomin <di...@gmail.com>
Authored: Tue Jan 10 22:04:41 2017 +0400
Committer: Stephan Ewen <se...@apache.org>
Committed: Sat Feb 18 19:19:34 2017 +0100
----------------------------------------------------------------------
.../NonReusingReOpenableHashTableITCase.java | 421 +-----------------
.../hash/ReOpenableHashTableITCase.java | 222 ++++++++++
.../hash/ReOpenableHashTableTestBase.java | 193 +++++++++
.../hash/ReusingReOpenableHashTableITCase.java | 429 +------------------
4 files changed, 429 insertions(+), 836 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53134594/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index 576cbd4..6b4e170 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -19,190 +19,34 @@
package org.apache.flink.runtime.operators.hash;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.UnionIterator;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
-import static org.junit.Assert.fail;
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples;
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectTupleData;
/**
* Test specialized hash join that keeps the build side data (in memory and on hard disk)
* This is used for iterative tasks.
*/
-public class NonReusingReOpenableHashTableITCase {
+public class NonReusingReOpenableHashTableITCase extends ReOpenableHashTableTestBase {
- private static final int PAGE_SIZE = 8 * 1024;
- private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
-
- private static final long SEED1 = 561349061987311L;
- private static final long SEED2 = 231434613412342L;
-
- private static final int NUM_PROBES = 3; // number of reopenings of hash join
-
- private final AbstractInvokable parentTask = new DummyInvokable();
-
- private IOManager ioManager;
- private MemoryManager memoryManager;
-
- private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
- private TypeComparator<Tuple2<Integer, String>> record1Comparator;
- private TypeComparator<Tuple2<Integer, String>> record2Comparator;
- private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
-
-
-
-
- private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
- private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
- private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
- private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
- private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
- private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Before
- public void beforeTest() {
- this.recordSerializer = TestData.getIntStringTupleSerializer();
-
- this.record1Comparator = TestData.getIntStringTupleComparator();
- this.record2Comparator = TestData.getIntStringTupleComparator();
- this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
-
- this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
- this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
- this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
- this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
- this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
-
- this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
- this.ioManager = new IOManagerAsync();
- }
-
- @After
- public void afterTest() {
- if (this.ioManager != null) {
- this.ioManager.shutdown();
- if (!this.ioManager.isProperlyShutDown()) {
- Assert.fail("I/O manager failed to properly shut down.");
- }
- this.ioManager = null;
- }
-
- if (this.memoryManager != null) {
- Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
- this.memoryManager.verifyEmpty());
- this.memoryManager.shutdown();
- this.memoryManager = null;
- }
- }
-
-
- /**
- * Test behavior with overflow buckets (Overflow buckets must be initialized correctly
- * if the input is reopened again)
- */
- @Test
- public void testOverflow() {
-
- int buildSize = 1000;
- int probeSize = 1000;
- try {
- TupleGenerator bgen = new TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
- TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-
- final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
- final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
- doTest(buildInput,probeInput, bgen, pgen);
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- /**
- * Verify proper operation if the build side is spilled to disk.
- */
- @Test
- public void testDoubleProbeSpilling() {
-
- int buildSize = 1000;
- int probeSize = 1000;
- try {
- TupleGenerator bgen = new TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-
- final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
- final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
- doTest(buildInput,probeInput, bgen, pgen);
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- /**
- * This test case verifies that hybrid hash join is able to handle multiple probe phases
- * when the build side fits completely into memory.
- */
- @Test
- public void testDoubleProbeInMemory() {
-
- int buildSize = 1000;
- int probeSize = 1000;
- try {
- TupleGenerator bgen = new TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- TupleGenerator pgen = new TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-
- final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
- final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
-
- doTest(buildInput,probeInput, bgen, pgen);
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator pgen) throws Exception {
+ protected void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator pgen) throws Exception {
// collect expected data
- final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = NonReusingHashJoinIteratorITCase.joinTuples(NonReusingHashJoinIteratorITCase.collectTupleData(buildInput), NonReusingHashJoinIteratorITCase.collectTupleData(probeInput));
+ final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = joinTuples(collectTupleData(buildInput), collectTupleData(probeInput));
final List<Map<Integer, Collection<TupleMatch>>> expectedNMatchesMapList = new ArrayList<>(NUM_PROBES);
final FlatJoinFunction[] nMatcher = new TupleMatchRemovingJoin[NUM_PROBES];
@@ -259,260 +103,5 @@ public class NonReusingReOpenableHashTableITCase {
iterator.close();
}
- //
- //
- // Tests taken from HahTableITCase!
- //
- //
-
- private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(final int numKeys,
- final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
- MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
- MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
- MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
- List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = new ArrayList<>();
- probes.add(probe1);
- probes.add(probe2);
- probes.add(probe3);
- return new UnionIterator<>(probes);
- }
-
- @Test
- public void testSpillingHashJoinWithMassiveCollisions() throws IOException
- {
- // the following two values are known to have a hash-code collision on the initial level.
- // we use them to make sure one partition grows over-proportionally large
- final int REPEATED_VALUE_1 = 40559;
- final int REPEATED_VALUE_2 = 92882;
- final int REPEATED_VALUE_COUNT_BUILD = 200000;
- final int REPEATED_VALUE_COUNT_PROBE = 5;
-
- final int NUM_KEYS = 1000000;
- final int BUILD_VALS_PER_KEY = 3;
- final int PROBE_VALS_PER_KEY = 10;
-
- // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
- MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
- MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
- MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
- List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
- builds.add(build1);
- builds.add(build2);
- builds.add(build3);
- MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
-
-
-
-
- // allocate the memory for the HashTable
- List<MemorySegment> memSegments;
- try {
- memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
- }
- catch (MemoryAllocationException maex) {
- fail("Memory for the Join could not be provided.");
- return;
- }
-
- // create the map for validating the results
- HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
-
- // ----------------------------------------------------------------------------------------
-
- final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
- this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
- this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
- memSegments, ioManager, true);
-
- for (int probe = 0; probe < NUM_PROBES; probe++) {
- // create a probe input that gives 10 million pairs with 10 values sharing a key
- MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
- if(probe == 0) {
- join.open(buildInput, probeInput);
- } else {
- join.reopenProbe(probeInput);
- }
-
- Tuple2<Integer, Integer> record;
- final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
-
- while (join.nextRecord()) {
- long numBuildValues = 0;
-
- final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
- Integer key = probeRec.f0;
-
- MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
- if ((record = buildSide.next(recordReuse)) != null) {
- numBuildValues = 1;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
- }
- else {
- fail("No build side values found for a probe key.");
- }
- while ((record = buildSide.next(record)) != null) {
- numBuildValues++;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
- }
-
- Long contained = map.get(key);
- if (contained == null) {
- contained = numBuildValues;
- }
- else {
- contained = contained + numBuildValues;
- }
-
- map.put(key, contained);
- }
- }
-
- join.close();
-
- Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
- for (Entry<Integer, Long> entry : map.entrySet()) {
- long val = entry.getValue();
- int key = entry.getKey();
-
- if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
- Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
- (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
- } else {
- Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
- PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
- }
- }
-
-
- // ----------------------------------------------------------------------------------------
-
- this.memoryManager.release(join.getFreedMemory());
- }
-
- /*
- * This test is basically identical to the "testSpillingHashJoinWithMassiveCollisions" test, only that the number
- * of repeated values (causing bucket collisions) are large enough to make sure that their target partition no longer
- * fits into memory by itself and needs to be repartitioned in the recursion again.
- */
- @Test
- public void testSpillingHashJoinWithTwoRecursions() throws IOException
- {
- // the following two values are known to have a hash-code collision on the first recursion level.
- // we use them to make sure one partition grows over-proportionally large
- final int REPEATED_VALUE_1 = 40559;
- final int REPEATED_VALUE_2 = 92882;
- final int REPEATED_VALUE_COUNT_BUILD = 200000;
- final int REPEATED_VALUE_COUNT_PROBE = 5;
-
- final int NUM_KEYS = 1000000;
- final int BUILD_VALS_PER_KEY = 3;
- final int PROBE_VALS_PER_KEY = 10;
-
- // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
- MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
- MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
- MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
- List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
- builds.add(build1);
- builds.add(build2);
- builds.add(build3);
- MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
-
-
- // allocate the memory for the HashTable
- List<MemorySegment> memSegments;
- try {
- memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
- }
- catch (MemoryAllocationException maex) {
- fail("Memory for the Join could not be provided.");
- return;
- }
-
- // create the map for validating the results
- HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
-
- // ----------------------------------------------------------------------------------------
-
- final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
- this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
- this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
- memSegments, ioManager, true);
-
- for (int probe = 0; probe < NUM_PROBES; probe++) {
- // create a probe input that gives 10 million pairs with 10 values sharing a key
- MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
- if (probe == 0) {
- join.open(buildInput, probeInput);
- } else {
- join.reopenProbe(probeInput);
- }
- Tuple2<Integer, Integer> record;
- final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
-
- while (join.nextRecord()) {
- long numBuildValues = 0;
-
- final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
- Integer key = probeRec.f0;
-
- MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
- if ((record = buildSide.next(recordReuse)) != null) {
- numBuildValues = 1;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
- }
- else {
- fail("No build side values found for a probe key.");
- }
- while ((record = buildSide.next(recordReuse)) != null) {
- numBuildValues++;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
- }
-
- Long contained = map.get(key);
- if (contained == null) {
- contained = numBuildValues;
- }
- else {
- contained = contained + numBuildValues;
- }
-
- map.put(key, contained);
- }
- }
-
- join.close();
- Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
- for (Entry<Integer, Long> entry : map.entrySet()) {
- long val = entry.getValue();
- int key = entry.getKey();
-
- if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
- Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
- (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
- } else {
- Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
- PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
- }
- }
-
-
- // ----------------------------------------------------------------------------------------
-
- this.memoryManager.release(join.getFreedMemory());
- }
-
- static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, Collection<TupleMatch>> expectedSecondMatchesMap) {
- Map<Integer, Collection<TupleMatch>> copy = new HashMap<>(expectedSecondMatchesMap.size());
- for(Entry<Integer, Collection<TupleMatch>> entry : expectedSecondMatchesMap.entrySet()) {
- List<TupleMatch> matches = new ArrayList<TupleMatch>(entry.getValue().size());
- for(TupleMatch m : entry.getValue()) {
- matches.add(m);
- }
- copy.put(entry.getKey(), matches);
- }
- return copy;
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53134594/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
new file mode 100644
index 0000000..f667c87
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryAllocationException;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.fail;
+
+public class ReOpenableHashTableITCase {
+
+ private static final int PAGE_SIZE = 8 * 1024;
+ private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
+
+ private static final int NUM_PROBES = 3; // number of reopenings of hash join
+
+ private IOManager ioManager;
+ private MemoryManager memoryManager;
+
+ private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
+ private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
+ private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
+ private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
+ private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
+ private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Before
+ public void beforeTest() {
+ this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
+ this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
+ this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
+ this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
+ this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
+
+ this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
+ this.ioManager = new IOManagerAsync();
+ }
+
+ @After
+ public void afterTest() {
+ if (this.ioManager != null) {
+ this.ioManager.shutdown();
+ if (!this.ioManager.isProperlyShutDown()) {
+ Assert.fail("I/O manager failed to properly shut down.");
+ }
+ this.ioManager = null;
+ }
+
+ if (this.memoryManager != null) {
+ Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
+ this.memoryManager.verifyEmpty());
+ this.memoryManager.shutdown();
+ this.memoryManager = null;
+ }
+ }
+
+ private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(final int numKeys,
+ final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
+ MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
+ MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
+ MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
+ List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = new ArrayList<>();
+ probes.add(probe1);
+ probes.add(probe2);
+ probes.add(probe3);
+ return new UnionIterator<>(probes);
+ }
+
+ @Test
+ public void testSpillingHashJoinWithMassiveCollisions() throws IOException
+ {
+ // the following two values are known to have a hash-code collision on the initial level.
+ // we use them to make sure one partition grows over-proportionally large
+ final int REPEATED_VALUE_1 = 40559;
+ final int REPEATED_VALUE_2 = 92882;
+ final int REPEATED_VALUE_COUNT_BUILD = 200000;
+ final int REPEATED_VALUE_COUNT_PROBE = 5;
+
+ final int NUM_KEYS = 1000000;
+ final int BUILD_VALS_PER_KEY = 3;
+ final int PROBE_VALS_PER_KEY = 10;
+
+ // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
+ MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+ MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+ MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+ List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
+ builds.add(build1);
+ builds.add(build2);
+ builds.add(build3);
+ MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
+
+
+
+
+ // allocate the memory for the HashTable
+ List<MemorySegment> memSegments;
+ try {
+ memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
+ }
+ catch (MemoryAllocationException maex) {
+ fail("Memory for the Join could not be provided.");
+ return;
+ }
+
+ // create the map for validating the results
+ HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
+
+ // ----------------------------------------------------------------------------------------
+
+ final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
+ this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
+ this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
+ memSegments, ioManager, true);
+
+ for (int probe = 0; probe < NUM_PROBES; probe++) {
+ // create a probe input that gives 10 million pairs with 10 values sharing a key
+ MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+ if(probe == 0) {
+ join.open(buildInput, probeInput);
+ } else {
+ join.reopenProbe(probeInput);
+ }
+
+ Tuple2<Integer, Integer> record;
+ final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
+
+ while (join.nextRecord()) {
+ long numBuildValues = 0;
+
+ final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+ Integer key = probeRec.f0;
+
+ MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
+ if ((record = buildSide.next(recordReuse)) != null) {
+ numBuildValues = 1;
+ Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
+ }
+ else {
+ fail("No build side values found for a probe key.");
+ }
+ while ((record = buildSide.next(recordReuse)) != null) {
+ numBuildValues++;
+ Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
+ }
+
+ Long contained = map.get(key);
+ if (contained == null) {
+ contained = numBuildValues;
+ }
+ else {
+ contained = contained + numBuildValues;
+ }
+
+ map.put(key, contained);
+ }
+ }
+
+ join.close();
+ Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
+ for (Map.Entry<Integer, Long> entry : map.entrySet()) {
+ long val = entry.getValue();
+ int key = entry.getKey();
+
+ if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
+ Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
+ (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
+ } else {
+ Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
+ PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
+ }
+ }
+
+
+ // ----------------------------------------------------------------------------------------
+
+ this.memoryManager.release(join.getFreedMemory());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53134594/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
new file mode 100644
index 0000000..c1b87b0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGeneratorIterator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import java.util.*;
+
+public abstract class ReOpenableHashTableTestBase {
+
+ protected static final int PAGE_SIZE = 8 * 1024;
+ protected static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
+
+ protected static final long SEED1 = 561349061987311L;
+ protected static final long SEED2 = 231434613412342L;
+
+ protected static final int NUM_PROBES = 3; // number of reopenings of hash join
+
+ protected final AbstractInvokable parentTask = new DummyInvokable();
+
+ protected IOManager ioManager;
+ protected MemoryManager memoryManager;
+
+ protected TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+ protected TypeComparator<Tuple2<Integer, String>> record1Comparator;
+ protected TypeComparator<Tuple2<Integer, String>> record2Comparator;
+ protected TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
+
+ protected TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
+ protected TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
+ protected TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
+ protected TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
+ protected TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Before
+ public void beforeTest() {
+ this.recordSerializer = TestData.getIntStringTupleSerializer();
+
+ this.record1Comparator = TestData.getIntStringTupleComparator();
+ this.record2Comparator = TestData.getIntStringTupleComparator();
+ this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
+
+ this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
+ this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
+ this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
+ this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
+ this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
+
+ this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
+ this.ioManager = new IOManagerAsync();
+ }
+
+ @After
+ public void afterTest() {
+ if (this.ioManager != null) {
+ this.ioManager.shutdown();
+ if (!this.ioManager.isProperlyShutDown()) {
+ Assert.fail("I/O manager failed to properly shut down.");
+ }
+ this.ioManager = null;
+ }
+
+ if (this.memoryManager != null) {
+ Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
+ this.memoryManager.verifyEmpty());
+ this.memoryManager.shutdown();
+ this.memoryManager = null;
+ }
+ }
+
+ /**
+ * Test behavior with overflow buckets (Overflow buckets must be initialized correctly
+ * if the input is reopened again)
+ */
+ @Test
+ public void testOverflow() {
+
+ int buildSize = 1000;
+ int probeSize = 1000;
+ try {
+ TupleGenerator bgen = new TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+ TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+ final TupleGeneratorIterator buildInput = new TupleGeneratorIterator(bgen, buildSize);
+ final TupleGeneratorIterator probeInput = new TupleGeneratorIterator(pgen, probeSize);
+ doTest(buildInput,probeInput, bgen, pgen);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An exception occurred during the test: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Verify proper operation if the build side is spilled to disk.
+ */
+ @Test
+ public void testDoubleProbeSpilling() {
+
+ int buildSize = 1000;
+ int probeSize = 1000;
+ try {
+ TupleGenerator bgen = new TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+ TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+ final TupleGeneratorIterator buildInput = new TupleGeneratorIterator(bgen, buildSize);
+ final TupleGeneratorIterator probeInput = new TupleGeneratorIterator(pgen, probeSize);
+ doTest(buildInput,probeInput, bgen, pgen);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An exception occurred during the test: " + e.getMessage());
+ }
+ }
+
+ /**
+ * This test case verifies that hybrid hash join is able to handle multiple probe phases
+ * when the build side fits completely into memory.
+ */
+ @Test
+ public void testDoubleProbeInMemory() {
+
+ int buildSize = 1000;
+ int probeSize = 1000;
+ try {
+ TupleGenerator bgen = new TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+ TupleGenerator pgen = new TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+
+ final TupleGeneratorIterator buildInput = new TupleGeneratorIterator(bgen, buildSize);
+ final TupleGeneratorIterator probeInput = new TupleGeneratorIterator(pgen, probeSize);
+
+ doTest(buildInput,probeInput, bgen, pgen);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An exception occurred during the test: " + e.getMessage());
+ }
+ }
+
+ abstract protected void doTest(TupleGeneratorIterator buildInput, TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, TestData.TupleGenerator pgen) throws Exception;
+
+ static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, Collection<TupleMatch>> expectedSecondMatchesMap) {
+ Map<Integer, Collection<TupleMatch>> copy = new HashMap<>(expectedSecondMatchesMap.size());
+ for(Map.Entry<Integer, Collection<TupleMatch>> entry : expectedSecondMatchesMap.entrySet()) {
+ List<TupleMatch> matches = new ArrayList<TupleMatch>(entry.getValue().size());
+ for(TupleMatch m : entry.getValue()) {
+ matches.add(m);
+ }
+ copy.put(entry.getKey(), matches);
+ }
+ return copy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53134594/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index 6afde16..af3a894 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -19,44 +19,21 @@
package org.apache.flink.runtime.operators.hash;
-import static org.junit.Assert.fail;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
-
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemoryType;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryAllocationException;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
-import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples;
import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectTupleData;
@@ -65,144 +42,9 @@ import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIterator
* Test specialized hash join that keeps the build side data (in memory and on hard disk)
* This is used for iterative tasks.
*/
-public class ReusingReOpenableHashTableITCase {
-
- private static final int PAGE_SIZE = 8 * 1024;
- private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages.
+public class ReusingReOpenableHashTableITCase extends ReOpenableHashTableTestBase {
- private static final long SEED1 = 561349061987311L;
- private static final long SEED2 = 231434613412342L;
-
- private static final int NUM_PROBES = 3; // number of reopenings of hash join
-
- private final AbstractInvokable parentTask = new DummyInvokable();
-
- private IOManager ioManager;
- private MemoryManager memoryManager;
-
- private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
- private TypeComparator<Tuple2<Integer, String>> record1Comparator;
- private TypeComparator<Tuple2<Integer, String>> record2Comparator;
- private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
-
-
-
-
- private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
- private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
- private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
- private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
- private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
- private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- @Before
- public void beforeTest() {
- this.recordSerializer = TestData.getIntStringTupleSerializer();
-
- this.record1Comparator = TestData.getIntStringTupleComparator();
- this.record2Comparator = TestData.getIntStringTupleComparator();
- this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
-
- this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
- this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
- this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
- this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
- this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
-
- this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
- this.ioManager = new IOManagerAsync();
- }
-
- @After
- public void afterTest() {
- if (this.ioManager != null) {
- this.ioManager.shutdown();
- if (!this.ioManager.isProperlyShutDown()) {
- Assert.fail("I/O manager failed to properly shut down.");
- }
- this.ioManager = null;
- }
-
- if (this.memoryManager != null) {
- Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
- this.memoryManager.verifyEmpty());
- this.memoryManager.shutdown();
- this.memoryManager = null;
- }
- }
-
-
- /**
- * Test behavior with overflow buckets (Overflow buckets must be initialized correctly
- * if the input is reopened again)
- */
- @Test
- public void testOverflow() {
-
- int buildSize = 1000;
- int probeSize = 1000;
- try {
- TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
- TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-
- final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
- final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
- doTest(buildInput,probeInput, bgen, pgen);
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- /**
- * Verify proper operation if the build side is spilled to disk.
- */
- @Test
- public void testDoubleProbeSpilling() {
-
- int buildSize = 1000;
- int probeSize = 1000;
- try {
- TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-
- final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
- final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
- doTest(buildInput,probeInput, bgen, pgen);
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- /**
- * This test case verifies that hybrid hash join is able to handle multiple probe phases
- * when the build side fits completely into memory.
- */
- @Test
- public void testDoubleProbeInMemory() {
-
- int buildSize = 1000;
- int probeSize = 1000;
- try {
- TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
-
- final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
- final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
-
- doTest(buildInput,probeInput, bgen, pgen);
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, TestData.TupleGenerator pgen) throws Exception {
+ protected void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator pgen) throws Exception {
// collect expected data
final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = joinTuples(collectTupleData(buildInput), collectTupleData(probeInput));
@@ -260,257 +102,4 @@ public class ReusingReOpenableHashTableITCase {
iterator.close();
}
-
- //
- //
- // Tests taken from HahTableITCase!
- //
- //
-
- private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(final int numKeys,
- final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
- MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
- MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
- MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
- List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = new ArrayList<>();
- probes.add(probe1);
- probes.add(probe2);
- probes.add(probe3);
- return new UnionIterator<>(probes);
- }
-
- @Test
- public void testSpillingHashJoinWithMassiveCollisions() throws IOException {
- // the following two values are known to have a hash-code collision on the initial level.
- // we use them to make sure one partition grows over-proportionally large
- final int REPEATED_VALUE_1 = 40559;
- final int REPEATED_VALUE_2 = 92882;
- final int REPEATED_VALUE_COUNT_BUILD = 200000;
- final int REPEATED_VALUE_COUNT_PROBE = 5;
-
- final int NUM_KEYS = 1000000;
- final int BUILD_VALS_PER_KEY = 3;
- final int PROBE_VALS_PER_KEY = 10;
-
- // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
- MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
- MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
- MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
- List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
- builds.add(build1);
- builds.add(build2);
- builds.add(build3);
- MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
-
- // allocate the memory for the HashTable
- List<MemorySegment> memSegments;
- try {
- memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
- }
- catch (MemoryAllocationException maex) {
- fail("Memory for the Join could not be provided.");
- return;
- }
-
- // create the map for validating the results
- HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
-
- // ----------------------------------------------------------------------------------------
-
- final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
- this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
- this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
- memSegments, ioManager, true);
-
- for(int probe = 0; probe < NUM_PROBES; probe++) {
- // create a probe input that gives 10 million pairs with 10 values sharing a key
- MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
- if(probe == 0) {
- join.open(buildInput, probeInput);
- } else {
- join.reopenProbe(probeInput);
- }
-
- Tuple2<Integer, Integer> record;
- final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
-
- while (join.nextRecord()) {
- long numBuildValues = 0;
-
- final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
- Integer key = probeRec.f0;
-
- MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
- if ((record = buildSide.next(recordReuse)) != null) {
- numBuildValues = 1;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
- }
- else {
- fail("No build side values found for a probe key.");
- }
- while ((record = buildSide.next(record)) != null) {
- numBuildValues++;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
- }
-
- Long contained = map.get(key);
- if (contained == null) {
- contained = numBuildValues;
- }
- else {
- contained = contained + numBuildValues;
- }
-
- map.put(key, contained);
- }
- }
-
- join.close();
-
- Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
- for (Map.Entry<Integer, Long> entry : map.entrySet()) {
- long val = entry.getValue();
- int key = entry.getKey();
-
- if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
- Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
- (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
- } else {
- Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
- PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
- }
- }
-
-
- // ----------------------------------------------------------------------------------------
-
- this.memoryManager.release(join.getFreedMemory());
- }
-
- /*
- * This test is basically identical to the "testSpillingHashJoinWithMassiveCollisions" test, only that the number
- * of repeated values (causing bucket collisions) are large enough to make sure that their target partition no longer
- * fits into memory by itself and needs to be repartitioned in the recursion again.
- */
- @Test
- public void testSpillingHashJoinWithTwoRecursions() throws IOException
- {
- // the following two values are known to have a hash-code collision on the first recursion level.
- // we use them to make sure one partition grows over-proportionally large
- final int REPEATED_VALUE_1 = 40559;
- final int REPEATED_VALUE_2 = 92882;
- final int REPEATED_VALUE_COUNT_BUILD = 200000;
- final int REPEATED_VALUE_COUNT_PROBE = 5;
-
- final int NUM_KEYS = 1000000;
- final int BUILD_VALS_PER_KEY = 3;
- final int PROBE_VALS_PER_KEY = 10;
-
- // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
- MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
- MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
- MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
- List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
- builds.add(build1);
- builds.add(build2);
- builds.add(build3);
- MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
-
-
- // allocate the memory for the HashTable
- List<MemorySegment> memSegments;
- try {
- memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896);
- }
- catch (MemoryAllocationException maex) {
- fail("Memory for the Join could not be provided.");
- return;
- }
-
- // create the map for validating the results
- HashMap<Integer, Long> map = new HashMap<Integer, Long>(NUM_KEYS);
-
- // ----------------------------------------------------------------------------------------
-
- final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
- this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
- this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
- memSegments, ioManager, true);
-
- for (int probe = 0; probe < NUM_PROBES; probe++) {
- // create a probe input that gives 10 million pairs with 10 values sharing a key
- MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
- if(probe == 0) {
- join.open(buildInput, probeInput);
- } else {
- join.reopenProbe(probeInput);
- }
- Tuple2<Integer, Integer> record;
- final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
-
- while (join.nextRecord())
- {
- long numBuildValues = 0;
-
- final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
- Integer key = probeRec.f0;
-
- MutableObjectIterator<Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
- if ((record = buildSide.next(recordReuse)) != null) {
- numBuildValues = 1;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
- }
- else {
- fail("No build side values found for a probe key.");
- }
- while ((record = buildSide.next(recordReuse)) != null) {
- numBuildValues++;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
- }
-
- Long contained = map.get(key);
- if (contained == null) {
- contained = numBuildValues;
- }
- else {
- contained = contained + numBuildValues;
- }
-
- map.put(key, contained);
- }
- }
-
- join.close();
- Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size());
- for (Map.Entry<Integer, Long> entry : map.entrySet()) {
- long val = entry.getValue();
- int key = entry.getKey();
-
- if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) {
- Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
- (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val);
- } else {
- Assert.assertEquals("Wrong number of values in per-key cross product for key " + key,
- PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val);
- }
- }
-
-
- // ----------------------------------------------------------------------------------------
-
- this.memoryManager.release(join.getFreedMemory());
- }
-
-
- static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, Collection<TupleMatch>> expectedSecondMatchesMap) {
- Map<Integer, Collection<TupleMatch>> copy = new HashMap<>(expectedSecondMatchesMap.size());
- for(Map.Entry<Integer, Collection<TupleMatch>> entry : expectedSecondMatchesMap.entrySet()) {
- List<TupleMatch> matches = new ArrayList<TupleMatch>(entry.getValue().size());
- for(TupleMatch m : entry.getValue()) {
- matches.add(m);
- }
- copy.put(entry.getKey(), matches);
- }
- return copy;
- }
}