You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/15 15:07:23 UTC
[4/6] flink git commit: [FLINK-2479] Refactor runtime.operators.*
tests
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index 86f879a..5a4fc6a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -22,10 +22,6 @@ package org.apache.flink.runtime.operators.hash;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -33,22 +29,16 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.HashTableITCase.ConstantsKeyValuePairsIterator;
import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.RecordMatch;
-import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase
- .RecordMatchRemovingJoin;
+import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.TupleMatch;
+import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.TupleMatchRemovingJoin;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
@@ -63,6 +53,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
import static org.junit.Assert.fail;
@@ -86,39 +80,35 @@ public class NonReusingReOpenableHashTableITCase {
private IOManager ioManager;
private MemoryManager memoryManager;
- private TypeSerializer<Record> recordSerializer;
- private TypeComparator<Record> record1Comparator;
- private TypeComparator<Record> record2Comparator;
- private TypePairComparator<Record, Record> recordPairComparator;
+ private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+ private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+ private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+ private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
- private TypeSerializer<Record> recordBuildSideAccesssor;
- private TypeSerializer<Record> recordProbeSideAccesssor;
- private TypeComparator<Record> recordBuildSideComparator;
- private TypeComparator<Record> recordProbeSideComparator;
- private TypePairComparator<Record, Record> pactRecordComparator;
+ private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
+ private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
+ private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
+ private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
+ private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
@SuppressWarnings({"unchecked", "rawtypes"})
@Before
public void beforeTest() {
- this.recordSerializer = RecordSerializer.get();
+ this.recordSerializer = TestData.getIntStringTupleSerializer();
- this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {Key.class});
- this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {Key.class});
- this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {Key.class});
+ this.record1Comparator = TestData.getIntStringTupleComparator();
+ this.record2Comparator = TestData.getIntStringTupleComparator();
+ this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
-
- final int[] keyPos = new int[] {0};
- final Class<? extends Key>[] keyType = (Class<? extends Key>[]) new Class[] { IntValue.class };
-
- this.recordBuildSideAccesssor = RecordSerializer.get();
- this.recordProbeSideAccesssor = RecordSerializer.get();
- this.recordBuildSideComparator = new RecordComparator(keyPos, keyType);
- this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
- this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
+ this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
+ this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
+ this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
+ this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
+ this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
this.ioManager = new IOManagerAsync();
@@ -153,11 +143,11 @@ public class NonReusingReOpenableHashTableITCase {
int buildSize = 1000;
int probeSize = 1000;
try {
- Generator bgen = new Generator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
- Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+ TupleGenerator bgen = new TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+ TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
- final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+ final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+ final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
doTest(buildInput,probeInput, bgen, pgen);
}
catch (Exception e) {
@@ -175,11 +165,11 @@ public class NonReusingReOpenableHashTableITCase {
int buildSize = 1000;
int probeSize = 1000;
try {
- Generator bgen = new Generator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+ TupleGenerator bgen = new TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+ TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
- final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+ final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+ final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
doTest(buildInput,probeInput, bgen, pgen);
}
catch (Exception e) {
@@ -198,11 +188,11 @@ public class NonReusingReOpenableHashTableITCase {
int buildSize = 1000;
int probeSize = 1000;
try {
- Generator bgen = new Generator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- Generator pgen = new Generator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+ TupleGenerator bgen = new TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+ TupleGenerator pgen = new TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
- final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+ final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+ final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
doTest(buildInput,probeInput, bgen, pgen);
}
@@ -212,21 +202,21 @@ public class NonReusingReOpenableHashTableITCase {
}
}
- private void doTest(TestData.GeneratorIterator buildInput, TestData.GeneratorIterator probeInput, Generator bgen, Generator pgen) throws Exception {
+ private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator pgen) throws Exception {
// collect expected data
- final Map<Key, Collection<RecordMatch>> expectedFirstMatchesMap = NonReusingHashMatchIteratorITCase.matchRecordValues(NonReusingHashMatchIteratorITCase.collectRecordData(buildInput), NonReusingHashMatchIteratorITCase.collectRecordData(probeInput));
+ final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = NonReusingHashMatchIteratorITCase.matchSecondTupleFields(NonReusingHashMatchIteratorITCase.collectTupleData(buildInput), NonReusingHashMatchIteratorITCase.collectTupleData(probeInput));
- final List<Map<Key, Collection<RecordMatch>>> expectedNMatchesMapList = new ArrayList<Map<Key,Collection<RecordMatch>>>(NUM_PROBES);
- final JoinFunction[] nMatcher = new RecordMatchRemovingJoin[NUM_PROBES];
+ final List<Map<Integer, Collection<TupleMatch>>> expectedNMatchesMapList = new ArrayList<>(NUM_PROBES);
+ final FlatJoinFunction[] nMatcher = new TupleMatchRemovingJoin[NUM_PROBES];
for(int i = 0; i < NUM_PROBES; i++) {
- Map<Key, Collection<RecordMatch>> tmp;
+ Map<Integer, Collection<TupleMatch>> tmp;
expectedNMatchesMapList.add(tmp = deepCopy(expectedFirstMatchesMap));
- nMatcher[i] = new RecordMatchRemovingJoin(tmp);
+ nMatcher[i] = new TupleMatchRemovingJoin(tmp);
}
- final JoinFunction firstMatcher = new RecordMatchRemovingJoin(expectedFirstMatchesMap);
+ final FlatJoinFunction firstMatcher = new TupleMatchRemovingJoin(expectedFirstMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
bgen.reset();
@@ -235,8 +225,8 @@ public class NonReusingReOpenableHashTableITCase {
probeInput.reset();
// compare with iterator values
- NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record> iterator =
- new NonReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
+ NonReusingBuildFirstReOpenableHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new NonReusingBuildFirstReOpenableHashMatchIterator<>(
buildInput, probeInput, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -246,7 +236,7 @@ public class NonReusingReOpenableHashTableITCase {
while (iterator.callWithNextKey(firstMatcher, collector));
// assert that each expected match was seen for the first input
- for (Entry<Key, Collection<RecordMatch>> entry : expectedFirstMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedFirstMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -261,7 +251,7 @@ public class NonReusingReOpenableHashTableITCase {
while (iterator.callWithNextKey(nMatcher[i], collector));
// assert that each expected match was seen for the second input
- for (Entry<Key, Collection<RecordMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -277,16 +267,16 @@ public class NonReusingReOpenableHashTableITCase {
//
//
- private MutableObjectIterator<Record> getProbeInput(final int numKeys,
+ private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(final int numKeys,
final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
- MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
- MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
- MutableObjectIterator<Record> probe3 = new ConstantsKeyValuePairsIterator(repeatedValue2, 23, 5);
- List<MutableObjectIterator<Record>> probes = new ArrayList<MutableObjectIterator<Record>>();
+ MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
+ MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
+ MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
+ List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = new ArrayList<>();
probes.add(probe1);
probes.add(probe2);
probes.add(probe3);
- return new UnionIterator<Record>(probes);
+ return new UnionIterator<>(probes);
}
@Test
@@ -304,14 +294,14 @@ public class NonReusingReOpenableHashTableITCase {
final int PROBE_VALS_PER_KEY = 10;
// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
- MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
- MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
- MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
- List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+ MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+ MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+ MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+ List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
builds.add(build1);
builds.add(build2);
builds.add(build3);
- MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+ MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
@@ -331,40 +321,40 @@ public class NonReusingReOpenableHashTableITCase {
// ----------------------------------------------------------------------------------------
- final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+ final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
memSegments, ioManager, true);
for (int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
- MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+ MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
if(probe == 0) {
join.open(buildInput, probeInput);
} else {
join.reopenProbe(probeInput);
}
- Record record;
- final Record recordReuse = new Record();
+ Tuple2<Integer, Integer> record;
+ final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
while (join.nextRecord()) {
long numBuildValues = 0;
- final Record probeRec = join.getCurrentProbeRecord();
- int key = probeRec.getField(0, IntValue.class).getValue();
+ final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+ Integer key = probeRec.f0;
- HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+ HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
}
else {
fail("No build side values found for a probe key.");
}
while ((record = buildSide.next(record)) != null) {
numBuildValues++;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
}
Long contained = map.get(key);
@@ -421,14 +411,14 @@ public class NonReusingReOpenableHashTableITCase {
final int PROBE_VALS_PER_KEY = 10;
// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
- MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
- MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
- MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
- List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+ MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+ MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+ MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+ List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
builds.add(build1);
builds.add(build2);
builds.add(build3);
- MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+ MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
// allocate the memory for the HashTable
@@ -446,39 +436,39 @@ public class NonReusingReOpenableHashTableITCase {
// ----------------------------------------------------------------------------------------
- final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+ final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
memSegments, ioManager, true);
for (int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
- MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+ MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
if (probe == 0) {
join.open(buildInput, probeInput);
} else {
join.reopenProbe(probeInput);
}
- Record record;
- final Record recordReuse = new Record();
+ Tuple2<Integer, Integer> record;
+ final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
while (join.nextRecord()) {
long numBuildValues = 0;
- final Record probeRec = join.getCurrentProbeRecord();
- int key = probeRec.getField(0, IntValue.class).getValue();
+ final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+ Integer key = probeRec.f0;
- HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+ HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
}
else {
fail("No build side values found for a probe key.");
}
while ((record = buildSide.next(recordReuse)) != null) {
numBuildValues++;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
}
Long contained = map.get(key);
@@ -515,11 +505,11 @@ public class NonReusingReOpenableHashTableITCase {
}
- static Map<Key, Collection<RecordMatch>> deepCopy(Map<Key, Collection<RecordMatch>> expectedSecondMatchesMap) {
- Map<Key, Collection<RecordMatch>> copy = new HashMap<Key, Collection<RecordMatch>>(expectedSecondMatchesMap.size());
- for(Entry<Key, Collection<RecordMatch>> entry : expectedSecondMatchesMap.entrySet()) {
- List<RecordMatch> matches = new ArrayList<RecordMatch>(entry.getValue().size());
- for(RecordMatch m : entry.getValue()) {
+ static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, Collection<TupleMatch>> expectedSecondMatchesMap) {
+ Map<Integer, Collection<TupleMatch>> copy = new HashMap<>(expectedSecondMatchesMap.size());
+ for(Entry<Integer, Collection<TupleMatch>> entry : expectedSecondMatchesMap.entrySet()) {
+ List<TupleMatch> matches = new ArrayList<TupleMatch>(entry.getValue().size());
+ for(TupleMatch m : entry.getValue()) {
matches.add(m);
}
copy.put(entry.getKey(), matches);
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
index 4fdff76..12f4a32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
@@ -28,13 +28,11 @@ import java.util.Map.Entry;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -44,16 +42,11 @@ import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
import org.apache.flink.runtime.operators.testutils.types.IntPair;
-import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
-import org.apache.flink.types.IntValue;
import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
@@ -77,31 +70,31 @@ public class ReusingHashMatchIteratorITCase {
private IOManager ioManager;
private MemoryManager memoryManager;
- private TypeSerializer<Record> recordSerializer;
- private TypeComparator<Record> record1Comparator;
- private TypeComparator<Record> record2Comparator;
- private TypePairComparator<Record, Record> recordPairComparator;
+ private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+ private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+ private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+ private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
private TypeSerializer<IntPair> pairSerializer;
private TypeComparator<IntPair> pairComparator;
- private TypePairComparator<IntPair, Record> pairRecordPairComparator;
- private TypePairComparator<Record, IntPair> recordPairPairComparator;
+ private TypePairComparator<IntPair, Tuple2<Integer, String>> pairRecordPairComparator;
+ private TypePairComparator<Tuple2<Integer, String>, IntPair> recordPairPairComparator;
@SuppressWarnings("unchecked")
@Before
public void beforeTest() {
- this.recordSerializer = RecordSerializer.get();
+ this.recordSerializer = TestData.getIntStringTupleSerializer();
- this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
- this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+ this.record1Comparator = TestData.getIntStringTupleComparator();
+ this.record2Comparator = TestData.getIntStringTupleComparator();
- this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+ this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
this.pairSerializer = new IntPairSerializer();
- this.pairComparator = new IntPairComparator();
- this.pairRecordPairComparator = new IntPairRecordPairComparator();
- this.recordPairPairComparator = new RecordIntPairPairComparator();
+ this.pairComparator = new TestData.IntPairComparator();
+ this.pairRecordPairComparator = new IntPairTuplePairComparator();
+ this.recordPairPairComparator = new TupleIntPairPairComparator();
this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
this.ioManager = new IOManagerAsync();
@@ -129,19 +122,19 @@ public class ReusingHashMatchIteratorITCase {
@Test
public void testBuildFirst() {
try {
- Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
- final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
- collectRecordData(input1),
- collectRecordData(input2));
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+ collectTupleData(input1),
+ collectTupleData(input2));
- final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
@@ -150,8 +143,8 @@ public class ReusingHashMatchIteratorITCase {
input2.reset();
// compare with iterator values
- ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
- new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+ ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildFirstHashMatchIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -163,7 +156,7 @@ public class ReusingHashMatchIteratorITCase {
iterator.close();
// assert that each expected match was seen
- for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -187,31 +180,31 @@ public class ReusingHashMatchIteratorITCase {
final int DUPLICATE_KEY = 13;
try {
- Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
- final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
- final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+ final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+ final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
- final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
inList1.add(gen1Iter);
inList1.add(const1Iter);
- final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
inList2.add(gen2Iter);
inList2.add(const2Iter);
- MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
- MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+ MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+ MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
// collect expected data
- final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
- collectRecordData(input1),
- collectRecordData(input2));
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+ collectTupleData(input1),
+ collectTupleData(input2));
// re-create the whole thing for actual processing
@@ -231,14 +224,14 @@ public class ReusingHashMatchIteratorITCase {
inList2.add(gen2Iter);
inList2.add(const2Iter);
- input1 = new UnionIterator<Record>(inList1);
- input2 = new UnionIterator<Record>(inList2);
+ input1 = new UnionIterator<Tuple2<Integer, String>>(inList1);
+ input2 = new UnionIterator<Tuple2<Integer, String>>(inList2);
- final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
- ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
- new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+ ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildFirstHashMatchIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -250,7 +243,7 @@ public class ReusingHashMatchIteratorITCase {
iterator.close();
// assert that each expected match was seen
- for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -265,19 +258,19 @@ public class ReusingHashMatchIteratorITCase {
@Test
public void testBuildSecond() {
try {
- Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
- final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
- collectRecordData(input1),
- collectRecordData(input2));
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+ collectTupleData(input1),
+ collectTupleData(input2));
- final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
@@ -286,8 +279,8 @@ public class ReusingHashMatchIteratorITCase {
input2.reset();
// compare with iterator values
- ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
- new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+ ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>,Tuple2<Integer, String> ,Tuple2<Integer, String> > iterator =
+ new ReusingBuildSecondHashMatchIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -299,7 +292,7 @@ public class ReusingHashMatchIteratorITCase {
iterator.close();
// assert that each expected match was seen
- for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -323,31 +316,31 @@ public class ReusingHashMatchIteratorITCase {
final int DUPLICATE_KEY = 13;
try {
- Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
- final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
- final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+ final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+ final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
- final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
inList1.add(gen1Iter);
inList1.add(const1Iter);
- final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
inList2.add(gen2Iter);
inList2.add(const2Iter);
- MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
- MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+ MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+ MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
// collect expected data
- final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
- collectRecordData(input1),
- collectRecordData(input2));
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+ collectTupleData(input1),
+ collectTupleData(input2));
// re-create the whole thing for actual processing
@@ -367,14 +360,14 @@ public class ReusingHashMatchIteratorITCase {
inList2.add(gen2Iter);
inList2.add(const2Iter);
- input1 = new UnionIterator<Record>(inList1);
- input2 = new UnionIterator<Record>(inList2);
+ input1 = new UnionIterator<>(inList1);
+ input2 = new UnionIterator<>(inList2);
- final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
- ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
- new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+ ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildSecondHashMatchIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -386,7 +379,7 @@ public class ReusingHashMatchIteratorITCase {
iterator.close();
// assert that each expected match was seen
- for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -403,16 +396,16 @@ public class ReusingHashMatchIteratorITCase {
try {
MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
- final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
- final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+ final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
collectIntPairData(input1),
- collectRecordData(input2));
+ collectTupleData(input2));
- final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
input1 = new UniformIntPairGenerator(500, 40, false);
@@ -420,8 +413,8 @@ public class ReusingHashMatchIteratorITCase {
input2.reset();
// compare with iterator values
- ReusingBuildSecondHashMatchIterator<IntPair, Record, Record> iterator =
- new ReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
+ ReusingBuildSecondHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildSecondHashMatchIterator<>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
@@ -433,7 +426,7 @@ public class ReusingHashMatchIteratorITCase {
iterator.close();
// assert that each expected match was seen
- for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -450,16 +443,16 @@ public class ReusingHashMatchIteratorITCase {
try {
MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
- final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
- final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+ final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
collectIntPairData(input1),
- collectRecordData(input2));
+ collectTupleData(input2));
- final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
input1 = new UniformIntPairGenerator(500, 40, false);
@@ -467,8 +460,8 @@ public class ReusingHashMatchIteratorITCase {
input2.reset();
// compare with iterator values
- ReusingBuildFirstHashMatchIterator<IntPair, Record, Record> iterator =
- new ReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
+ ReusingBuildFirstHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildFirstHashMatchIterator<>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
@@ -480,7 +473,7 @@ public class ReusingHashMatchIteratorITCase {
iterator.close();
// assert that each expected match was seen
- for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -498,29 +491,29 @@ public class ReusingHashMatchIteratorITCase {
- static Map<TestData.Key, Collection<RecordMatch>> matchRecordValues(
- Map<TestData.Key, Collection<TestData.Value>> leftMap,
- Map<TestData.Key, Collection<TestData.Value>> rightMap)
+ static Map<Integer, Collection<TupleMatch>> matchSecondTupleFields(
+ Map<Integer, Collection<String>> leftMap,
+ Map<Integer, Collection<String>> rightMap)
{
- Map<TestData.Key, Collection<RecordMatch>> map = new HashMap<TestData.Key, Collection<RecordMatch>>();
+ Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
- for (TestData.Key key : leftMap.keySet()) {
- Collection<TestData.Value> leftValues = leftMap.get(key);
- Collection<TestData.Value> rightValues = rightMap.get(key);
+ for (Integer key : leftMap.keySet()) {
+ Collection<String> leftValues = leftMap.get(key);
+ Collection<String> rightValues = rightMap.get(key);
if (rightValues == null) {
continue;
}
if (!map.containsKey(key)) {
- map.put(key, new ArrayList<RecordMatch>());
+ map.put(key, new ArrayList<TupleMatch>());
}
- Collection<RecordMatch> matchedValues = map.get(key);
+ Collection<TupleMatch> matchedValues = map.get(key);
- for (TestData.Value leftValue : leftValues) {
- for (TestData.Value rightValue : rightValues) {
- matchedValues.add(new RecordMatch(leftValue, rightValue));
+ for (String leftValue : leftValues) {
+ for (String rightValue : rightValues) {
+ matchedValues.add(new TupleMatch(leftValue, rightValue));
}
}
}
@@ -528,32 +521,32 @@ public class ReusingHashMatchIteratorITCase {
return map;
}
- static Map<TestData.Key, Collection<RecordIntPairMatch>> matchRecordIntPairValues(
+ static Map<Integer, Collection<TupleIntPairMatch>> matchTupleIntPairValues(
Map<Integer, Collection<Integer>> leftMap,
- Map<TestData.Key, Collection<TestData.Value>> rightMap)
+ Map<Integer, Collection<String>> rightMap)
{
- final Map<TestData.Key, Collection<RecordIntPairMatch>> map = new HashMap<TestData.Key, Collection<RecordIntPairMatch>>();
+ final Map<Integer, Collection<TupleIntPairMatch>> map = new HashMap<>();
for (Integer i : leftMap.keySet()) {
- final TestData.Key key = new TestData.Key(i.intValue());
+ final Integer key = new Integer(i.intValue());
final Collection<Integer> leftValues = leftMap.get(i);
- final Collection<TestData.Value> rightValues = rightMap.get(key);
+ final Collection<String> rightValues = rightMap.get(key);
if (rightValues == null) {
continue;
}
if (!map.containsKey(key)) {
- map.put(key, new ArrayList<RecordIntPairMatch>());
+ map.put(key, new ArrayList<TupleIntPairMatch>());
}
- final Collection<RecordIntPairMatch> matchedValues = map.get(key);
+ final Collection<TupleIntPairMatch> matchedValues = map.get(key);
for (Integer v : leftValues) {
- for (TestData.Value val : rightValues) {
- matchedValues.add(new RecordIntPairMatch(v, val));
+ for (String val : rightValues) {
+ matchedValues.add(new TupleIntPairMatch(v, val));
}
}
}
@@ -562,21 +555,21 @@ public class ReusingHashMatchIteratorITCase {
}
- static Map<TestData.Key, Collection<TestData.Value>> collectRecordData(MutableObjectIterator<Record> iter)
+ static Map<Integer, Collection<String>> collectTupleData(MutableObjectIterator<Tuple2<Integer, String>> iter)
throws Exception
{
- Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
- Record pair = new Record();
+ Map<Integer, Collection<String>> map = new HashMap<>();
+ Tuple2<Integer, String> pair = new Tuple2<>();
while ((pair = iter.next(pair)) != null) {
- TestData.Key key = pair.getField(0, TestData.Key.class);
+ Integer key = pair.f0;
if (!map.containsKey(key)) {
- map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+ map.put(key, new ArrayList<String>());
}
- Collection<TestData.Value> values = map.get(key);
- values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+ Collection<String> values = map.get(key);
+ values.add(pair.f1);
}
return map;
@@ -585,7 +578,7 @@ public class ReusingHashMatchIteratorITCase {
static Map<Integer, Collection<Integer>> collectIntPairData(MutableObjectIterator<IntPair> iter)
throws Exception
{
- Map<Integer, Collection<Integer>> map = new HashMap<Integer, Collection<Integer>>();
+ Map<Integer, Collection<Integer>> map = new HashMap<>();
IntPair pair = new IntPair();
while ((pair = iter.next(pair)) != null) {
@@ -606,19 +599,19 @@ public class ReusingHashMatchIteratorITCase {
/**
* Private class used for storage of the expected matches in a hash-map.
*/
- static class RecordMatch {
+ static class TupleMatch {
- private final Value left;
- private final Value right;
+ private final String left;
+ private final String right;
- public RecordMatch(Value left, Value right) {
+ public TupleMatch(String left, String right) {
this.left = left;
this.right = right;
}
@Override
public boolean equals(Object obj) {
- RecordMatch o = (RecordMatch) obj;
+ TupleMatch o = (TupleMatch) obj;
return this.left.equals(o.left) && this.right.equals(o.right);
}
@@ -636,19 +629,19 @@ public class ReusingHashMatchIteratorITCase {
/**
* Private class used for storage of the expected matches in a hash-map.
*/
- static class RecordIntPairMatch
+ static class TupleIntPairMatch
{
private final int left;
- private final Value right;
+ private final String right;
- public RecordIntPairMatch(int left, Value right) {
+ public TupleIntPairMatch(int left, String right) {
this.left = left;
this.right = right;
}
@Override
public boolean equals(Object obj) {
- RecordIntPairMatch o = (RecordIntPairMatch) obj;
+ TupleIntPairMatch o = (TupleIntPairMatch) obj;
return this.left == o.left && this.right.equals(o.right);
}
@@ -663,28 +656,28 @@ public class ReusingHashMatchIteratorITCase {
}
}
- static final class RecordMatchRemovingJoin extends JoinFunction
+ static final class TupleMatchRemovingJoin implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>
{
- private final Map<TestData.Key, Collection<RecordMatch>> toRemoveFrom;
+ private final Map<Integer, Collection<TupleMatch>> toRemoveFrom;
- protected RecordMatchRemovingJoin(Map<TestData.Key, Collection<RecordMatch>> map) {
+ protected TupleMatchRemovingJoin(Map<Integer, Collection<TupleMatch>> map) {
this.toRemoveFrom = map;
}
@Override
- public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception
+ public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
{
- TestData.Key key = rec1.getField(0, TestData.Key.class);
- TestData.Value value1 = rec1.getField(1, TestData.Value.class);
- TestData.Value value2 = rec2.getField(1, TestData.Value.class);
+ Integer key = rec1.f0;
+ String value1 = rec1.f1;
+ String value2 = rec2.f1;
//System.err.println("rec1 key = "+key+" rec2 key= "+rec2.getField(0, TestData.Key.class));
- Collection<RecordMatch> matches = this.toRemoveFrom.get(key);
+ Collection<TupleMatch> matches = this.toRemoveFrom.get(key);
if (matches == null) {
Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
}
Assert.assertTrue("Produced match was not contained: " + key + " - " + value1 + ":" + value2,
- matches.remove(new RecordMatch(value1, value2)));
+ matches.remove(new TupleMatch(value1, value2)));
if (matches.isEmpty()) {
this.toRemoveFrom.remove(key);
@@ -692,32 +685,32 @@ public class ReusingHashMatchIteratorITCase {
}
}
- static final class RecordIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Record, Record>
+ static final class TupleIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>>
{
- private final Map<TestData.Key, Collection<RecordIntPairMatch>> toRemoveFrom;
+ private final Map<Integer, Collection<TupleIntPairMatch>> toRemoveFrom;
- protected RecordIntPairMatchRemovingMatcher(Map<TestData.Key, Collection<RecordIntPairMatch>> map) {
+ protected TupleIntPairMatchRemovingMatcher(Map<Integer, Collection<TupleIntPairMatch>> map) {
this.toRemoveFrom = map;
}
@Override
- public void join(IntPair rec1, Record rec2, Collector<Record> out) throws Exception
+ public void join(IntPair rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
{
final int k = rec1.getKey();
final int v = rec1.getValue();
- final TestData.Key key = rec2.getField(0, TestData.Key.class);
- final TestData.Value value = rec2.getField(1, TestData.Value.class);
+ final Integer key = rec2.f0;
+ final String value = rec2.f1;
- Assert.assertTrue("Key does not match for matching IntPair Record combination.", k == key.getKey());
+ Assert.assertTrue("Key does not match for matching IntPair Tuple combination.", k == key);
- Collection<RecordIntPairMatch> matches = this.toRemoveFrom.get(key);
+ Collection<TupleIntPairMatch> matches = this.toRemoveFrom.get(key);
if (matches == null) {
Assert.fail("Match " + key + " - " + v + ":" + value + " is unexpected.");
}
Assert.assertTrue("Produced match was not contained: " + key + " - " + v + ":" + value,
- matches.remove(new RecordIntPairMatch(v, value)));
+ matches.remove(new TupleIntPairMatch(v, value)));
if (matches.isEmpty()) {
this.toRemoveFrom.remove(key);
@@ -725,7 +718,7 @@ public class ReusingHashMatchIteratorITCase {
}
}
- static final class IntPairRecordPairComparator extends TypePairComparator<IntPair, Record>
+ static final class IntPairTuplePairComparator extends TypePairComparator<IntPair, Tuple2<Integer, String>>
{
private int reference;
@@ -735,33 +728,31 @@ public class ReusingHashMatchIteratorITCase {
}
@Override
- public boolean equalToReference(Record candidate) {
+ public boolean equalToReference(Tuple2<Integer, String> candidate) {
try {
- final IntValue i = candidate.getField(0, IntValue.class);
- return i.getValue() == this.reference;
+ return candidate.f0 == this.reference;
} catch (NullPointerException npex) {
throw new NullKeyFieldException();
}
}
@Override
- public int compareToReference(Record candidate) {
+ public int compareToReference(Tuple2<Integer, String> candidate) {
try {
- final IntValue i = candidate.getField(0, IntValue.class);
- return i.getValue() - this.reference;
+ return candidate.f0 - this.reference;
} catch (NullPointerException npex) {
throw new NullKeyFieldException();
}
}
}
- static final class RecordIntPairPairComparator extends TypePairComparator<Record, IntPair>
+ static final class TupleIntPairPairComparator extends TypePairComparator<Tuple2<Integer, String>, IntPair>
{
private int reference;
@Override
- public void setReference(Record reference) {
- this.reference = reference.getField(0, IntValue.class).getValue();
+ public void setReference(Tuple2<Integer, String> reference) {
+ this.reference = reference.f0;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
index ba5a325..2a306ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java
@@ -28,14 +28,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -43,21 +42,16 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.RecordMatch;
-import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.RecordMatchRemovingJoin;
-import org.apache.flink.runtime.operators.hash.HashTableITCase.ConstantsKeyValuePairsIterator;
+import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.TupleMatch;
+import org.apache.flink.runtime.operators.hash.ReusingHashMatchIteratorITCase.TupleMatchRemovingJoin;
import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
@@ -85,39 +79,35 @@ public class ReusingReOpenableHashTableITCase {
private IOManager ioManager;
private MemoryManager memoryManager;
- private TypeSerializer<Record> recordSerializer;
- private TypeComparator<Record> record1Comparator;
- private TypeComparator<Record> record2Comparator;
- private TypePairComparator<Record, Record> recordPairComparator;
+ private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+ private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+ private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+ private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
private static final AbstractInvokable MEM_OWNER = new DummyInvokable();
- private TypeSerializer<Record> recordBuildSideAccesssor;
- private TypeSerializer<Record> recordProbeSideAccesssor;
- private TypeComparator<Record> recordBuildSideComparator;
- private TypeComparator<Record> recordProbeSideComparator;
- private TypePairComparator<Record, Record> pactRecordComparator;
+ private TypeSerializer<Tuple2<Integer, Integer>> recordBuildSideAccesssor;
+ private TypeSerializer<Tuple2<Integer, Integer>> recordProbeSideAccesssor;
+ private TypeComparator<Tuple2<Integer, Integer>> recordBuildSideComparator;
+ private TypeComparator<Tuple2<Integer, Integer>> recordProbeSideComparator;
+ private TypePairComparator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> pactRecordComparator;
@SuppressWarnings({"unchecked", "rawtypes"})
@Before
public void beforeTest() {
- this.recordSerializer = RecordSerializer.get();
+ this.recordSerializer = TestData.getIntStringTupleSerializer();
- this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
- this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
- this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+ this.record1Comparator = TestData.getIntStringTupleComparator();
+ this.record2Comparator = TestData.getIntStringTupleComparator();
+ this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
-
- final int[] keyPos = new int[] {0};
- final Class<? extends Key>[] keyType = (Class<? extends Key>[]) new Class[] { IntValue.class };
-
- this.recordBuildSideAccesssor = RecordSerializer.get();
- this.recordProbeSideAccesssor = RecordSerializer.get();
- this.recordBuildSideComparator = new RecordComparator(keyPos, keyType);
- this.recordProbeSideComparator = new RecordComparator(keyPos, keyType);
- this.pactRecordComparator = new HashTableITCase.RecordPairComparatorFirstInt();
+ this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer();
+ this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer();
+ this.recordBuildSideComparator = TestData.getIntIntTupleComparator();
+ this.recordProbeSideComparator = TestData.getIntIntTupleComparator();
+ this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator);
this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
this.ioManager = new IOManagerAsync();
@@ -152,11 +142,11 @@ public class ReusingReOpenableHashTableITCase {
int buildSize = 1000;
int probeSize = 1000;
try {
- Generator bgen = new Generator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
- Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+ TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+ TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
- final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+ final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+ final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
doTest(buildInput,probeInput, bgen, pgen);
}
catch (Exception e) {
@@ -174,11 +164,11 @@ public class ReusingReOpenableHashTableITCase {
int buildSize = 1000;
int probeSize = 1000;
try {
- Generator bgen = new Generator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- Generator pgen = new Generator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+ TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+ TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
- final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+ final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+ final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
doTest(buildInput,probeInput, bgen, pgen);
}
catch (Exception e) {
@@ -197,11 +187,11 @@ public class ReusingReOpenableHashTableITCase {
int buildSize = 1000;
int probeSize = 1000;
try {
- Generator bgen = new Generator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- Generator pgen = new Generator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+ TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
+ TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH);
- final TestData.GeneratorIterator buildInput = new TestData.GeneratorIterator(bgen, buildSize);
- final TestData.GeneratorIterator probeInput = new TestData.GeneratorIterator(pgen, probeSize);
+ final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize);
+ final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize);
doTest(buildInput,probeInput, bgen, pgen);
}
@@ -211,21 +201,21 @@ public class ReusingReOpenableHashTableITCase {
}
}
- private void doTest(TestData.GeneratorIterator buildInput, TestData.GeneratorIterator probeInput, Generator bgen, Generator pgen) throws Exception {
+ private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, TestData.TupleGenerator pgen) throws Exception {
// collect expected data
- final Map<TestData.Key, Collection<RecordMatch>> expectedFirstMatchesMap = ReusingHashMatchIteratorITCase.matchRecordValues(ReusingHashMatchIteratorITCase.collectRecordData(buildInput), ReusingHashMatchIteratorITCase.collectRecordData(probeInput));
+ final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = ReusingHashMatchIteratorITCase.matchSecondTupleFields(ReusingHashMatchIteratorITCase.collectTupleData(buildInput), ReusingHashMatchIteratorITCase.collectTupleData(probeInput));
- final List<Map<TestData.Key, Collection<RecordMatch>>> expectedNMatchesMapList = new ArrayList<Map<Key,Collection<RecordMatch>>>(NUM_PROBES);
- final JoinFunction[] nMatcher = new RecordMatchRemovingJoin[NUM_PROBES];
+ final List<Map<Integer, Collection<TupleMatch>>> expectedNMatchesMapList = new ArrayList<>(NUM_PROBES);
+ final FlatJoinFunction[] nMatcher = new TupleMatchRemovingJoin[NUM_PROBES];
for(int i = 0; i < NUM_PROBES; i++) {
- Map<TestData.Key, Collection<RecordMatch>> tmp;
+ Map<Integer, Collection<TupleMatch>> tmp;
expectedNMatchesMapList.add(tmp = deepCopy(expectedFirstMatchesMap));
- nMatcher[i] = new RecordMatchRemovingJoin(tmp);
+ nMatcher[i] = new TupleMatchRemovingJoin(tmp);
}
- final JoinFunction firstMatcher = new RecordMatchRemovingJoin(expectedFirstMatchesMap);
+ final FlatJoinFunction firstMatcher = new TupleMatchRemovingJoin(expectedFirstMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
bgen.reset();
@@ -234,8 +224,8 @@ public class ReusingReOpenableHashTableITCase {
probeInput.reset();
// compare with iterator values
- ReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record> iterator =
- new ReusingBuildFirstReOpenableHashMatchIterator<Record, Record, Record>(
+ ReusingBuildFirstReOpenableHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildFirstReOpenableHashMatchIterator<>(
buildInput, probeInput, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -245,7 +235,7 @@ public class ReusingReOpenableHashTableITCase {
while (iterator.callWithNextKey(firstMatcher, collector));
// assert that each expected match was seen for the first input
- for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedFirstMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedFirstMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -260,7 +250,7 @@ public class ReusingReOpenableHashTableITCase {
while (iterator.callWithNextKey(nMatcher[i], collector));
// assert that each expected match was seen for the second input
- for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedNMatchesMapList.get(i).entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -276,16 +266,16 @@ public class ReusingReOpenableHashTableITCase {
//
//
- private MutableObjectIterator<Record> getProbeInput(final int numKeys,
+ private MutableObjectIterator<Tuple2<Integer, Integer>> getProbeInput(final int numKeys,
final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) {
- MutableObjectIterator<Record> probe1 = new UniformRecordGenerator(numKeys, probeValsPerKey, true);
- MutableObjectIterator<Record> probe2 = new ConstantsKeyValuePairsIterator(repeatedValue1, 17, 5);
- MutableObjectIterator<Record> probe3 = new ConstantsKeyValuePairsIterator(repeatedValue2, 23, 5);
- List<MutableObjectIterator<Record>> probes = new ArrayList<MutableObjectIterator<Record>>();
+ MutableObjectIterator<Tuple2<Integer, Integer>> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true);
+ MutableObjectIterator<Tuple2<Integer, Integer>> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5);
+ MutableObjectIterator<Tuple2<Integer, Integer>> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5);
+ List<MutableObjectIterator<Tuple2<Integer, Integer>>> probes = new ArrayList<>();
probes.add(probe1);
probes.add(probe2);
probes.add(probe3);
- return new UnionIterator<Record>(probes);
+ return new UnionIterator<>(probes);
}
@Test
@@ -302,14 +292,14 @@ public class ReusingReOpenableHashTableITCase {
final int PROBE_VALS_PER_KEY = 10;
// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
- MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
- MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
- MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
- List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+ MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+ MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+ MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+ List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
builds.add(build1);
builds.add(build2);
builds.add(build3);
- MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+ MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
// allocate the memory for the HashTable
List<MemorySegment> memSegments;
@@ -326,40 +316,40 @@ public class ReusingReOpenableHashTableITCase {
// ----------------------------------------------------------------------------------------
- final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+ final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
memSegments, ioManager, true);
for(int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
- MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+ MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
if(probe == 0) {
join.open(buildInput, probeInput);
} else {
join.reopenProbe(probeInput);
}
- Record record;
- final Record recordReuse = new Record();
+ Tuple2<Integer, Integer> record;
+ final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
while (join.nextRecord()) {
long numBuildValues = 0;
- final Record probeRec = join.getCurrentProbeRecord();
- int key = probeRec.getField(0, IntValue.class).getValue();
+ final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+ Integer key = probeRec.f0;
- HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+ HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
}
else {
fail("No build side values found for a probe key.");
}
while ((record = buildSide.next(record)) != null) {
numBuildValues++;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
}
Long contained = map.get(key);
@@ -416,14 +406,14 @@ public class ReusingReOpenableHashTableITCase {
final int PROBE_VALS_PER_KEY = 10;
// create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys
- MutableObjectIterator<Record> build1 = new UniformRecordGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
- MutableObjectIterator<Record> build2 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
- MutableObjectIterator<Record> build3 = new ConstantsKeyValuePairsIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
- List<MutableObjectIterator<Record>> builds = new ArrayList<MutableObjectIterator<Record>>();
+ MutableObjectIterator<Tuple2<Integer, Integer>> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);
+ MutableObjectIterator<Tuple2<Integer, Integer>> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD);
+ MutableObjectIterator<Tuple2<Integer, Integer>> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD);
+ List<MutableObjectIterator<Tuple2<Integer, Integer>>> builds = new ArrayList<>();
builds.add(build1);
builds.add(build2);
builds.add(build3);
- MutableObjectIterator<Record> buildInput = new UnionIterator<Record>(builds);
+ MutableObjectIterator<Tuple2<Integer, Integer>> buildInput = new UnionIterator<>(builds);
// allocate the memory for the HashTable
@@ -441,40 +431,40 @@ public class ReusingReOpenableHashTableITCase {
// ----------------------------------------------------------------------------------------
- final ReOpenableMutableHashTable<Record, Record> join = new ReOpenableMutableHashTable<Record, Record>(
+ final ReOpenableMutableHashTable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> join = new ReOpenableMutableHashTable<>(
this.recordBuildSideAccesssor, this.recordProbeSideAccesssor,
this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator,
memSegments, ioManager, true);
for (int probe = 0; probe < NUM_PROBES; probe++) {
// create a probe input that gives 10 million pairs with 10 values sharing a key
- MutableObjectIterator<Record> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
+ MutableObjectIterator<Tuple2<Integer, Integer>> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2);
if(probe == 0) {
join.open(buildInput, probeInput);
} else {
join.reopenProbe(probeInput);
}
- Record record;
- final Record recordReuse = new Record();
+ Tuple2<Integer, Integer> record;
+ final Tuple2<Integer, Integer> recordReuse = new Tuple2<>();
while (join.nextRecord())
{
long numBuildValues = 0;
- final Record probeRec = join.getCurrentProbeRecord();
- int key = probeRec.getField(0, IntValue.class).getValue();
+ final Tuple2<Integer, Integer> probeRec = join.getCurrentProbeRecord();
+ Integer key = probeRec.f0;
- HashBucketIterator<Record, Record> buildSide = join.getBuildSideIterator();
+ HashBucketIterator<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> buildSide = join.getBuildSideIterator();
if ((record = buildSide.next(recordReuse)) != null) {
numBuildValues = 1;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
}
else {
fail("No build side values found for a probe key.");
}
while ((record = buildSide.next(recordReuse)) != null) {
numBuildValues++;
- Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue());
+ Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0);
}
Long contained = map.get(key);
@@ -511,11 +501,11 @@ public class ReusingReOpenableHashTableITCase {
}
- static Map<Key, Collection<RecordMatch>> deepCopy(Map<Key, Collection<RecordMatch>> expectedSecondMatchesMap) {
- Map<Key, Collection<RecordMatch>> copy = new HashMap<Key, Collection<RecordMatch>>(expectedSecondMatchesMap.size());
- for(Map.Entry<Key, Collection<RecordMatch>> entry : expectedSecondMatchesMap.entrySet()) {
- List<RecordMatch> matches = new ArrayList<RecordMatch>(entry.getValue().size());
- for(RecordMatch m : entry.getValue()) {
+ static Map<Integer, Collection<TupleMatch>> deepCopy(Map<Integer, Collection<TupleMatch>> expectedSecondMatchesMap) {
+ Map<Integer, Collection<TupleMatch>> copy = new HashMap<>(expectedSecondMatchesMap.size());
+ for(Map.Entry<Integer, Collection<TupleMatch>> entry : expectedSecondMatchesMap.entrySet()) {
+ List<TupleMatch> matches = new ArrayList<TupleMatch>(entry.getValue().size());
+ for(TupleMatch m : entry.getValue()) {
matches.add(m);
}
copy.put(entry.getKey(), matches);