You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/19 18:01:13 UTC
[07/10] flink git commit: [FLINK-2107] Add hash-based strategies for
left and right outer joins.
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
deleted file mode 100644
index 1795062..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
+++ /dev/null
@@ -1,766 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
-import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.runtime.operators.testutils.types.IntPair;
-import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
-import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-@SuppressWarnings({"serial", "deprecation"})
-public class NonReusingHashMatchIteratorITCase {
-
- private static final int MEMORY_SIZE = 16000000; // total memory
-
- private static final int INPUT_1_SIZE = 20000;
- private static final int INPUT_2_SIZE = 1000;
-
- private static final long SEED1 = 561349061987311L;
- private static final long SEED2 = 231434613412342L;
-
- private final AbstractInvokable parentTask = new DummyInvokable();
-
- private IOManager ioManager;
- private MemoryManager memoryManager;
-
- private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
- private TypeComparator<Tuple2<Integer, String>> record1Comparator;
- private TypeComparator<Tuple2<Integer, String>> record2Comparator;
- private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
-
- private TypeSerializer<IntPair> pairSerializer;
- private TypeComparator<IntPair> pairComparator;
- private TypePairComparator<IntPair, Tuple2<Integer, String>> pairRecordPairComparator;
- private TypePairComparator<Tuple2<Integer, String>, IntPair> recordPairPairComparator;
-
-
- @SuppressWarnings("unchecked")
- @Before
- public void beforeTest() {
- this.recordSerializer = TestData.getIntStringTupleSerializer();
-
- this.record1Comparator = TestData.getIntStringTupleComparator();
- this.record2Comparator = TestData.getIntStringTupleComparator();
-
- this.recordPairComparator = new GenericPairComparator(record1Comparator, record2Comparator);
-
- this.pairSerializer = new IntPairSerializer();
- this.pairComparator = new TestData.IntPairComparator();
- this.pairRecordPairComparator = new IntPairTuplePairComparator();
- this.recordPairPairComparator = new TupleIntPairPairComparator();
-
- this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
- this.ioManager = new IOManagerAsync();
- }
-
- @After
- public void afterTest() {
- if (this.ioManager != null) {
- this.ioManager.shutdown();
- if (!this.ioManager.isProperlyShutDown()) {
- Assert.fail("I/O manager failed to properly shut down.");
- }
- this.ioManager = null;
- }
-
- if (this.memoryManager != null) {
- Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
- this.memoryManager.verifyEmpty());
- this.memoryManager.shutdown();
- this.memoryManager = null;
- }
- }
-
-
- @Test
- public void testBuildFirst() {
- try {
- TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-
- final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
- // collect expected data
- final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
- collectTupleData(input1),
- collectTupleData(input2));
-
- final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
- final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
-
- // reset the generators
- generator1.reset();
- generator2.reset();
- input1.reset();
- input2.reset();
-
- // compare with iterator values
- NonReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new NonReusingBuildFirstHashMatchIterator<>(
- input1, input2, this.recordSerializer, this.record1Comparator,
- this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0, true);
-
- iterator.open();
-
- while (iterator.callWithNextKey(matcher, collector));
-
- iterator.close();
-
- // assert that each expected match was seen
- for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
- if (!entry.getValue().isEmpty()) {
- Assert.fail("Collection for key " + entry.getKey() + " is not empty");
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- @Test
- public void testBuildFirstWithHighNumberOfCommonKeys()
- {
- // the size of the left and right inputs
- final int INPUT_1_SIZE = 200;
- final int INPUT_2_SIZE = 100;
-
- final int INPUT_1_DUPLICATES = 10;
- final int INPUT_2_DUPLICATES = 2000;
- final int DUPLICATE_KEY = 13;
-
- try {
- TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-
- final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
- final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
- final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
-
- final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
- inList1.add(gen1Iter);
- inList1.add(const1Iter);
-
- final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
- inList2.add(gen2Iter);
- inList2.add(const2Iter);
-
- MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
- MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
-
-
- // collect expected data
- final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
- collectTupleData(input1),
- collectTupleData(input2));
-
- // re-create the whole thing for actual processing
-
- // reset the generators and iterators
- generator1.reset();
- generator2.reset();
- const1Iter.reset();
- const2Iter.reset();
- gen1Iter.reset();
- gen2Iter.reset();
-
- inList1.clear();
- inList1.add(gen1Iter);
- inList1.add(const1Iter);
-
- inList2.clear();
- inList2.add(gen2Iter);
- inList2.add(const2Iter);
-
- input1 = new UnionIterator<>(inList1);
- input2 = new UnionIterator<>(inList2);
-
- final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
- final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
-
- NonReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new NonReusingBuildFirstHashMatchIterator<>(
- input1, input2, this.recordSerializer, this.record1Comparator,
- this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0, true);
-
- iterator.open();
-
- while (iterator.callWithNextKey(matcher, collector));
-
- iterator.close();
-
- // assert that each expected match was seen
- for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
- if (!entry.getValue().isEmpty()) {
- Assert.fail("Collection for key " + entry.getKey() + " is not empty");
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- @Test
- public void testBuildSecond() {
- try {
- TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-
- final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
- // collect expected data
- final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
- collectTupleData(input1),
- collectTupleData(input2));
-
- final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
- final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
-
- // reset the generators
- generator1.reset();
- generator2.reset();
- input1.reset();
- input2.reset();
-
- // compare with iterator values
- NonReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new NonReusingBuildSecondHashMatchIterator<>(
- input1, input2, this.recordSerializer, this.record1Comparator,
- this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0, true);
-
- iterator.open();
-
- while (iterator.callWithNextKey(matcher, collector));
-
- iterator.close();
-
- // assert that each expected match was seen
- for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
- if (!entry.getValue().isEmpty()) {
- Assert.fail("Collection for key " + entry.getKey() + " is not empty");
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- @Test
- public void testBuildSecondWithHighNumberOfCommonKeys()
- {
- // the size of the left and right inputs
- final int INPUT_1_SIZE = 200;
- final int INPUT_2_SIZE = 100;
-
- final int INPUT_1_DUPLICATES = 10;
- final int INPUT_2_DUPLICATES = 2000;
- final int DUPLICATE_KEY = 13;
-
- try {
- TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-
- final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
- final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
- final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
-
- final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
- inList1.add(gen1Iter);
- inList1.add(const1Iter);
-
- final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
- inList2.add(gen2Iter);
- inList2.add(const2Iter);
-
- MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
- MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
-
-
- // collect expected data
- final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
- collectTupleData(input1),
- collectTupleData(input2));
-
- // re-create the whole thing for actual processing
-
- // reset the generators and iterators
- generator1.reset();
- generator2.reset();
- const1Iter.reset();
- const2Iter.reset();
- gen1Iter.reset();
- gen2Iter.reset();
-
- inList1.clear();
- inList1.add(gen1Iter);
- inList1.add(const1Iter);
-
- inList2.clear();
- inList2.add(gen2Iter);
- inList2.add(const2Iter);
-
- input1 = new UnionIterator<>(inList1);
- input2 = new UnionIterator<>(inList2);
-
- final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
- final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
-
- NonReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new NonReusingBuildSecondHashMatchIterator<>(
- input1, input2, this.recordSerializer, this.record1Comparator,
- this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0, true);
-
- iterator.open();
-
- while (iterator.callWithNextKey(matcher, collector));
-
- iterator.close();
-
- // assert that each expected match was seen
- for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
- if (!entry.getValue().isEmpty()) {
- Assert.fail("Collection for key " + entry.getKey() + " is not empty");
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- @Test
- public void testBuildFirstWithMixedDataTypes() {
- try {
- MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
-
- final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
- // collect expected data
- final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
- collectIntPairData(input1),
- collectTupleData(input2));
-
- final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
- final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
-
- // reset the generators
- input1 = new UniformIntPairGenerator(500, 40, false);
- generator2.reset();
- input2.reset();
-
- // compare with iterator values
- NonReusingBuildSecondHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new NonReusingBuildSecondHashMatchIterator<>(
- input1, input2, this.pairSerializer, this.pairComparator,
- this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
- this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
-
- iterator.open();
-
- while (iterator.callWithNextKey(matcher, collector));
-
- iterator.close();
-
- // assert that each expected match was seen
- for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
- if (!entry.getValue().isEmpty()) {
- Assert.fail("Collection for key " + entry.getKey() + " is not empty");
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- @Test
- public void testBuildSecondWithMixedDataTypes() {
- try {
- MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
-
- final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
- // collect expected data
- final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
- collectIntPairData(input1),
- collectTupleData(input2));
-
- final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
- final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
-
- // reset the generators
- input1 = new UniformIntPairGenerator(500, 40, false);
- generator2.reset();
- input2.reset();
-
- // compare with iterator values
- NonReusingBuildFirstHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new NonReusingBuildFirstHashMatchIterator<>(
- input1, input2, this.pairSerializer, this.pairComparator,
- this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
- this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
-
- iterator.open();
-
- while (iterator.callWithNextKey(matcher, collector));
-
- iterator.close();
-
- // assert that each expected match was seen
- for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
- if (!entry.getValue().isEmpty()) {
- Assert.fail("Collection for key " + entry.getKey() + " is not empty");
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Utilities
- // --------------------------------------------------------------------------------------------
-
-
-
- static Map<Integer, Collection<TupleMatch>> matchSecondTupleFields(
- Map<Integer, Collection<String>> leftMap,
- Map<Integer, Collection<String>> rightMap)
- {
- Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
-
- for (Integer key : leftMap.keySet()) {
- Collection<String> leftValues = leftMap.get(key);
- Collection<String> rightValues = rightMap.get(key);
-
- if (rightValues == null) {
- continue;
- }
-
- if (!map.containsKey(key)) {
- map.put(key, new ArrayList<TupleMatch>());
- }
-
- Collection<TupleMatch> matchedValues = map.get(key);
-
- for (String leftValue : leftValues) {
- for (String rightValue : rightValues) {
- matchedValues.add(new TupleMatch(leftValue, rightValue));
- }
- }
- }
-
- return map;
- }
-
- static Map<Integer, Collection<TupleIntPairMatch>> matchTupleIntPairValues(
- Map<Integer, Collection<Integer>> leftMap,
- Map<Integer, Collection<String>> rightMap)
- {
- final Map<Integer, Collection<TupleIntPairMatch>> map = new HashMap<>();
-
- for (Integer i : leftMap.keySet()) {
-
- final Collection<Integer> leftValues = leftMap.get(i);
- final Collection<String> rightValues = rightMap.get(i);
-
- if (rightValues == null) {
- continue;
- }
-
- if (!map.containsKey(i)) {
- map.put(i, new ArrayList<TupleIntPairMatch>());
- }
-
- final Collection<TupleIntPairMatch> matchedValues = map.get(i);
-
- for (Integer v : leftValues) {
- for (String val : rightValues) {
- matchedValues.add(new TupleIntPairMatch(v, val));
- }
- }
- }
-
- return map;
- }
-
-
- static Map<Integer, Collection<String>> collectTupleData(MutableObjectIterator<Tuple2<Integer, String>> iter)
- throws Exception
- {
- Map<Integer, Collection<String>> map = new HashMap<>();
- Tuple2<Integer, String> pair = new Tuple2<>();
-
- while ((pair = iter.next(pair)) != null) {
-
- Integer key = pair.f0;
- if (!map.containsKey(key)) {
- map.put(key, new ArrayList<String>());
- }
-
- Collection<String> values = map.get(key);
- values.add(pair.f1);
- }
-
- return map;
- }
-
- static Map<Integer, Collection<Integer>> collectIntPairData(MutableObjectIterator<IntPair> iter)
- throws Exception
- {
- Map<Integer, Collection<Integer>> map = new HashMap<Integer, Collection<Integer>>();
- IntPair pair = new IntPair();
-
- while ((pair = iter.next(pair)) != null) {
-
- final int key = pair.getKey();
- final int value = pair.getValue();
- if (!map.containsKey(key)) {
- map.put(key, new ArrayList<Integer>());
- }
-
- Collection<Integer> values = map.get(key);
- values.add(value);
- }
-
- return map;
- }
-
- /**
- * Private class used for storage of the expected matches in a hash-map.
- */
- static class TupleMatch {
-
- private final String left;
- private final String right;
-
- public TupleMatch(String left, String right) {
- this.left = left;
- this.right = right;
- }
-
- @Override
- public boolean equals(Object obj) {
- TupleMatch o = (TupleMatch) obj;
- return this.left.equals(o.left) && this.right.equals(o.right);
- }
-
- @Override
- public int hashCode() {
- return this.left.hashCode() ^ this.right.hashCode();
- }
-
- @Override
- public String toString() {
- return left + ", " + right;
- }
- }
-
- /**
- * Private class used for storage of the expected matches in a hash-map.
- */
- static class TupleIntPairMatch
- {
- private final int left;
- private final String right;
-
- public TupleIntPairMatch(int left, String right) {
- this.left = left;
- this.right = new String(right);
- }
-
- @Override
- public boolean equals(Object obj) {
- TupleIntPairMatch o = (TupleIntPairMatch) obj;
- return this.left == o.left && this.right.equals(o.right);
- }
-
- @Override
- public int hashCode() {
- return this.left ^ this.right.hashCode();
- }
-
- @Override
- public String toString() {
- return left + ", " + right;
- }
- }
-
- static final class TupleMatchRemovingJoin implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>
- {
- private final Map<Integer, Collection<TupleMatch>> toRemoveFrom;
-
- protected TupleMatchRemovingJoin(Map<Integer, Collection<TupleMatch>> map) {
- this.toRemoveFrom = map;
- }
-
- @Override
- public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
- {
- int key = rec1.f0;
- String value1 = rec1.f1;
- String value2 = rec2.f1;
- //System.err.println("rec1 key = "+key+" rec2 key= "+rec2.f0);
- Collection<TupleMatch> matches = this.toRemoveFrom.get(key);
- if (matches == null) {
- Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
- }
-
- Assert.assertTrue("Produced match was not contained: " + key + " - " + value1 + ":" + value2,
- matches.remove(new TupleMatch(value1, value2)));
-
- if (matches.isEmpty()) {
- this.toRemoveFrom.remove(key);
- }
- }
- }
-
- static final class TupleIntPairMatchRemovingMatcher implements FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>>
- {
- private final Map<Integer, Collection<TupleIntPairMatch>> toRemoveFrom;
-
- protected TupleIntPairMatchRemovingMatcher(Map<Integer, Collection<TupleIntPairMatch>> map) {
- this.toRemoveFrom = map;
- }
-
- @Override
- public void join(IntPair rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
- {
- final int k = rec1.getKey();
- final int v = rec1.getValue();
-
- final Integer key = rec2.f0;
- final String value = rec2.f1;
-
- Assert.assertTrue("Key does not match for matching IntPair Tuple combination.", k == key);
-
- Collection<TupleIntPairMatch> matches = this.toRemoveFrom.get(key);
- if (matches == null) {
- Assert.fail("Match " + key + " - " + v + ":" + value + " is unexpected.");
- }
-
- Assert.assertTrue("Produced match was not contained: " + key + " - " + v + ":" + value,
- matches.remove(new TupleIntPairMatch(v, value)));
-
- if (matches.isEmpty()) {
- this.toRemoveFrom.remove(key);
- }
- }
- }
-
- static final class IntPairTuplePairComparator extends TypePairComparator<IntPair, Tuple2<Integer, String>>
- {
- private int reference;
-
- @Override
- public void setReference(IntPair reference) {
- this.reference = reference.getKey();
- }
-
- @Override
- public boolean equalToReference(Tuple2<Integer, String> candidate) {
- try {
- return candidate.f0 == this.reference;
- } catch (NullPointerException npex) {
- throw new NullKeyFieldException();
- }
- }
-
- @Override
- public int compareToReference(Tuple2<Integer, String> candidate) {
- try {
- return candidate.f0 - this.reference;
- } catch (NullPointerException npex) {
- throw new NullKeyFieldException();
- }
- }
- }
-
- static final class TupleIntPairPairComparator extends TypePairComparator<Tuple2<Integer, String>, IntPair>
- {
- private int reference;
-
- @Override
- public void setReference(Tuple2<Integer, String> reference) {
- this.reference = reference.f0;
- }
-
- @Override
- public boolean equalToReference(IntPair candidate) {
- return this.reference == candidate.getKey();
- }
-
- @Override
- public int compareToReference(IntPair candidate) {
- return candidate.getKey() - this.reference;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
index 5a4fc6a..a885e6b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java
@@ -30,8 +30,8 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator;
-import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.TupleMatch;
-import org.apache.flink.runtime.operators.hash.NonReusingHashMatchIteratorITCase.TupleMatchRemovingJoin;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
@@ -204,7 +204,7 @@ public class NonReusingReOpenableHashTableITCase {
private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator pgen) throws Exception {
// collect expected data
- final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = NonReusingHashMatchIteratorITCase.matchSecondTupleFields(NonReusingHashMatchIteratorITCase.collectTupleData(buildInput), NonReusingHashMatchIteratorITCase.collectTupleData(probeInput));
+ final Map<Integer, Collection<TupleMatch>> expectedFirstMatchesMap = NonReusingHashJoinIteratorITCase.joinTuples(NonReusingHashJoinIteratorITCase.collectTupleData(buildInput), NonReusingHashJoinIteratorITCase.collectTupleData(probeInput));
final List<Map<Integer, Collection<TupleMatch>>> expectedNMatchesMapList = new ArrayList<>(NUM_PROBES);
final FlatJoinFunction[] nMatcher = new TupleMatchRemovingJoin[NUM_PROBES];
@@ -225,11 +225,11 @@ public class NonReusingReOpenableHashTableITCase {
probeInput.reset();
// compare with iterator values
- NonReusingBuildFirstReOpenableHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new NonReusingBuildFirstReOpenableHashMatchIterator<>(
+ NonReusingBuildFirstReOpenableHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new NonReusingBuildFirstReOpenableHashJoinIterator<>(
buildInput, probeInput, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0, true);
+ this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
iterator.open();
// do first join with both inputs
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
new file mode 100644
index 0000000..87707a4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java
@@ -0,0 +1,709 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators.hash;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypePairComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch;
+import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleIntPairMatch;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DummyInvokable;
+import org.apache.flink.runtime.operators.testutils.TestData;
+import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.types.IntPair;
+import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
+import org.apache.flink.types.NullKeyFieldException;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples;
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.leftOuterJoinTuples;
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.rightOuterJoinTuples;
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinIntPairs;
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectTupleData;
+import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectIntPairData;
+
+@SuppressWarnings({"serial", "deprecation"})
+public class ReusingHashJoinIteratorITCase {
+
+ private static final int MEMORY_SIZE = 16000000; // total memory
+
+ private static final int INPUT_1_SIZE = 20000;
+ private static final int INPUT_2_SIZE = 1000;
+
+ private static final long SEED1 = 561349061987311L;
+ private static final long SEED2 = 231434613412342L;
+
+ private final AbstractInvokable parentTask = new DummyInvokable();
+
+ private IOManager ioManager;
+ private MemoryManager memoryManager;
+
+ private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+ private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+ private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+ private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
+
+ private TypeSerializer<IntPair> pairSerializer;
+ private TypeComparator<IntPair> pairComparator;
+ private TypePairComparator<IntPair, Tuple2<Integer, String>> pairRecordPairComparator;
+ private TypePairComparator<Tuple2<Integer, String>, IntPair> recordPairPairComparator;
+
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void beforeTest() {
+ this.recordSerializer = TestData.getIntStringTupleSerializer();
+
+ this.record1Comparator = TestData.getIntStringTupleComparator();
+ this.record2Comparator = TestData.getIntStringTupleComparator();
+
+ this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
+
+ this.pairSerializer = new IntPairSerializer();
+ this.pairComparator = new TestData.IntPairComparator();
+ this.pairRecordPairComparator = new IntPairTuplePairComparator();
+ this.recordPairPairComparator = new TupleIntPairPairComparator();
+
+ this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
+ this.ioManager = new IOManagerAsync();
+ }
+
+ @After
+ public void afterTest() {
+ if (this.ioManager != null) {
+ this.ioManager.shutdown();
+ if (!this.ioManager.isProperlyShutDown()) {
+ Assert.fail("I/O manager failed to properly shut down.");
+ }
+ this.ioManager = null;
+ }
+
+ if (this.memoryManager != null) {
+ Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
+ this.memoryManager.verifyEmpty());
+ this.memoryManager.shutdown();
+ this.memoryManager = null;
+ }
+ }
+
+
+ @Test
+ public void testBuildFirst() {
+ try {
+ TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+
+ final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+ // collect expected data
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = joinTuples(
+ collectTupleData(input1),
+ collectTupleData(input2));
+
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+
+ // reset the generators
+ generator1.reset();
+ generator2.reset();
+ input1.reset();
+ input2.reset();
+
+ // compare with iterator values
+ ReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildFirstHashJoinIterator<>(
+ input1, input2, this.recordSerializer, this.record1Comparator,
+ this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+ this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+
+ iterator.open();
+
+ while (iterator.callWithNextKey(matcher, collector));
+
+ iterator.close();
+
+ // assert that each expected match was seen
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An exception occurred during the test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBuildFirstWithHighNumberOfCommonKeys()
+ {
+ // the size of the left and right inputs
+ final int INPUT_1_SIZE = 200;
+ final int INPUT_2_SIZE = 100;
+
+ final int INPUT_1_DUPLICATES = 10;
+ final int INPUT_2_DUPLICATES = 2000;
+ final int DUPLICATE_KEY = 13;
+
+ try {
+ TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+
+ final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+ final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+ final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
+ inList1.add(gen1Iter);
+ inList1.add(const1Iter);
+
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
+ inList2.add(gen2Iter);
+ inList2.add(const2Iter);
+
+ MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+ MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
+
+
+ // collect expected data
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = joinTuples(
+ collectTupleData(input1),
+ collectTupleData(input2));
+
+ // re-create the whole thing for actual processing
+
+ // reset the generators and iterators
+ generator1.reset();
+ generator2.reset();
+ const1Iter.reset();
+ const2Iter.reset();
+ gen1Iter.reset();
+ gen2Iter.reset();
+
+ inList1.clear();
+ inList1.add(gen1Iter);
+ inList1.add(const1Iter);
+
+ inList2.clear();
+ inList2.add(gen2Iter);
+ inList2.add(const2Iter);
+
+ input1 = new UnionIterator<>(inList1);
+ input2 = new UnionIterator<>(inList2);
+
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+
+ ReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildFirstHashJoinIterator<>(
+ input1, input2, this.recordSerializer, this.record1Comparator,
+ this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+ this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+
+ iterator.open();
+
+ while (iterator.callWithNextKey(matcher, collector));
+
+ iterator.close();
+
+ // assert that each expected match was seen
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An exception occurred during the test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBuildSecond() {
+ try {
+ TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+
+ final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+ // collect expected data
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = joinTuples(
+ collectTupleData(input1),
+ collectTupleData(input2));
+
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+
+ // reset the generators
+ generator1.reset();
+ generator2.reset();
+ input1.reset();
+ input2.reset();
+
+ // compare with iterator values
+ ReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>,Tuple2<Integer, String> ,Tuple2<Integer, String> > iterator =
+ new ReusingBuildSecondHashJoinIterator<>(
+ input1, input2, this.recordSerializer, this.record1Comparator,
+ this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+ this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+
+ iterator.open();
+
+ while (iterator.callWithNextKey(matcher, collector));
+
+ iterator.close();
+
+ // assert that each expected match was seen
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An exception occurred during the test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBuildSecondWithHighNumberOfCommonKeys()
+ {
+ // the size of the left and right inputs
+ final int INPUT_1_SIZE = 200;
+ final int INPUT_2_SIZE = 100;
+
+ final int INPUT_1_DUPLICATES = 10;
+ final int INPUT_2_DUPLICATES = 2000;
+ final int DUPLICATE_KEY = 13;
+
+ try {
+ TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+
+ final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+ final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+ final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
+ inList1.add(gen1Iter);
+ inList1.add(const1Iter);
+
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
+ inList2.add(gen2Iter);
+ inList2.add(const2Iter);
+
+ MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+ MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
+
+
+ // collect expected data
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = joinTuples(
+ collectTupleData(input1),
+ collectTupleData(input2));
+
+ // re-create the whole thing for actual processing
+
+ // reset the generators and iterators
+ generator1.reset();
+ generator2.reset();
+ const1Iter.reset();
+ const2Iter.reset();
+ gen1Iter.reset();
+ gen2Iter.reset();
+
+ inList1.clear();
+ inList1.add(gen1Iter);
+ inList1.add(const1Iter);
+
+ inList2.clear();
+ inList2.add(gen2Iter);
+ inList2.add(const2Iter);
+
+ input1 = new UnionIterator<>(inList1);
+ input2 = new UnionIterator<>(inList2);
+
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+
+ ReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildSecondHashJoinIterator<>(
+ input1, input2, this.recordSerializer, this.record1Comparator,
+ this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+ this.memoryManager, ioManager, this.parentTask, 1.0, false, true);
+
+ iterator.open();
+
+ while (iterator.callWithNextKey(matcher, collector));
+
+ iterator.close();
+
+ // assert that each expected match was seen
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An exception occurred during the test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBuildFirstWithMixedDataTypes() {
+ try {
+ MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
+
+ final TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+ // collect expected data
+ final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = joinIntPairs(
+ collectIntPairData(input1),
+ collectTupleData(input2));
+
+ final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+
+ // reset the generators
+ input1 = new UniformIntPairGenerator(500, 40, false);
+ generator2.reset();
+ input2.reset();
+
+ // compare with iterator values
+ ReusingBuildSecondHashJoinIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildSecondHashJoinIterator<>(
+ input1, input2, this.pairSerializer, this.pairComparator,
+ this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
+ this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true);
+
+ iterator.open();
+
+ while (iterator.callWithNextKey(matcher, collector));
+
+ iterator.close();
+
+ // assert that each expected match was seen
+ for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An exception occurred during the test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBuildSecondWithMixedDataTypes() {
+ try {
+ MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
+
+ final TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+ // collect expected data
+ final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = joinIntPairs(
+ collectIntPairData(input1),
+ collectTupleData(input2));
+
+ final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+
+ // reset the generators
+ input1 = new UniformIntPairGenerator(500, 40, false);
+ generator2.reset();
+ input2.reset();
+
+ // compare with iterator values
+ ReusingBuildFirstHashJoinIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildFirstHashJoinIterator<>(
+ input1, input2, this.pairSerializer, this.pairComparator,
+ this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
+ this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true);
+
+ iterator.open();
+
+ while (iterator.callWithNextKey(matcher, collector));
+
+ iterator.close();
+
+ // assert that each expected match was seen
+ for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An exception occurred during the test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBuildFirstJoinWithEmptyBuild() {
+ try {
+ TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+
+ final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+ // collect expected data
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = rightOuterJoinTuples(
+ collectTupleData(input1),
+ collectTupleData(input2));
+
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+
+ // reset the generators
+ generator1.reset();
+ generator2.reset();
+ input1.reset();
+ input2.reset();
+
+ // compare with iterator values
+ ReusingBuildFirstHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildFirstHashJoinIterator<>(
+ input1, input2, this.recordSerializer, this.record1Comparator,
+ this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+ this.memoryManager, ioManager, this.parentTask, 1.0, true, false);
+
+ iterator.open();
+
+ while (iterator.callWithNextKey(matcher, collector));
+
+ iterator.close();
+
+ // assert that each expected match was seen
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An exception occurred during the test: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testBuildSecondJoinWithEmptyBuild() {
+ try {
+ TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+
+ final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
+
+ // collect expected data
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = leftOuterJoinTuples(
+ collectTupleData(input1),
+ collectTupleData(input2));
+
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
+
+ // reset the generators
+ generator1.reset();
+ generator2.reset();
+ input1.reset();
+ input2.reset();
+
+ // compare with iterator values
+ ReusingBuildSecondHashJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildSecondHashJoinIterator<>(
+ input1, input2, this.recordSerializer, this.record1Comparator,
+ this.recordSerializer, this.record2Comparator, this.recordPairComparator,
+ this.memoryManager, ioManager, this.parentTask, 1.0, true, false);
+
+ iterator.open();
+
+ while (iterator.callWithNextKey(matcher, collector));
+
+ iterator.close();
+
+ // assert that each expected match was seen
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
+ if (!entry.getValue().isEmpty()) {
+ Assert.fail("Collection for key " + entry.getKey() + " is not empty");
+ }
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("An exception occurred during the test: " + e.getMessage());
+ }
+ }
+
+
+ // --------------------------------------------------------------------------------------------
+ // Utilities
+ // --------------------------------------------------------------------------------------------
+
+
+ static final class TupleMatchRemovingJoin implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>
+ {
+ private final Map<Integer, Collection<TupleMatch>> toRemoveFrom;
+
+ protected TupleMatchRemovingJoin(Map<Integer, Collection<TupleMatch>> map) {
+ this.toRemoveFrom = map;
+ }
+
+ @Override
+ public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
+ {
+ Integer key = rec1 != null ? rec1.f0 : rec2.f0;
+ String value1 = rec1 != null ? rec1.f1 : null;
+ String value2 = rec2 != null ? rec2.f1 : null;
+ //System.err.println("rec1 key = "+key+" rec2 key= "+rec2.getField(0, TestData.Key.class));
+ Collection<TupleMatch> matches = this.toRemoveFrom.get(key);
+ if (matches == null) {
+ Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
+ }
+
+ Assert.assertTrue("Produced match was not contained: " + key + " - " + value1 + ":" + value2,
+ matches.remove(new TupleMatch(value1, value2)));
+
+ if (matches.isEmpty()) {
+ this.toRemoveFrom.remove(key);
+ }
+ }
+ }
+
+ static final class TupleIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>>
+ {
+ private final Map<Integer, Collection<TupleIntPairMatch>> toRemoveFrom;
+
+ protected TupleIntPairMatchRemovingMatcher(Map<Integer, Collection<TupleIntPairMatch>> map) {
+ this.toRemoveFrom = map;
+ }
+
+ @Override
+ public void join(IntPair rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
+ {
+ final int k = rec1.getKey();
+ final int v = rec1.getValue();
+
+ final Integer key = rec2.f0;
+ final String value = rec2.f1;
+
+ Assert.assertTrue("Key does not match for matching IntPair Tuple combination.", k == key);
+
+ Collection<TupleIntPairMatch> matches = this.toRemoveFrom.get(key);
+ if (matches == null) {
+ Assert.fail("Match " + key + " - " + v + ":" + value + " is unexpected.");
+ }
+
+ Assert.assertTrue("Produced match was not contained: " + key + " - " + v + ":" + value,
+ matches.remove(new TupleIntPairMatch(v, value)));
+
+ if (matches.isEmpty()) {
+ this.toRemoveFrom.remove(key);
+ }
+ }
+ }
+
+ static final class IntPairTuplePairComparator extends TypePairComparator<IntPair, Tuple2<Integer, String>>
+ {
+ private int reference;
+
+ @Override
+ public void setReference(IntPair reference) {
+ this.reference = reference.getKey();
+ }
+
+ @Override
+ public boolean equalToReference(Tuple2<Integer, String> candidate) {
+ try {
+ return candidate.f0 == this.reference;
+ } catch (NullPointerException npex) {
+ throw new NullKeyFieldException();
+ }
+ }
+
+ @Override
+ public int compareToReference(Tuple2<Integer, String> candidate) {
+ try {
+ return candidate.f0 - this.reference;
+ } catch (NullPointerException npex) {
+ throw new NullKeyFieldException();
+ }
+ }
+ }
+
+ static final class TupleIntPairPairComparator extends TypePairComparator<Tuple2<Integer, String>, IntPair>
+ {
+ private int reference;
+
+ @Override
+ public void setReference(Tuple2<Integer, String> reference) {
+ this.reference = reference.f0;
+ }
+
+ @Override
+ public boolean equalToReference(IntPair candidate) {
+ return this.reference == candidate.getKey();
+ }
+
+ @Override
+ public int compareToReference(IntPair candidate) {
+ return candidate.getKey() - this.reference;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5671c77c/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
deleted file mode 100644
index 12f4a32..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashMatchIteratorITCase.java
+++ /dev/null
@@ -1,768 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators.hash;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.GenericPairComparator;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypePairComparator;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
-import org.apache.flink.runtime.operators.testutils.UnionIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.types.IntPair;
-import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
-import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-@SuppressWarnings({"serial", "deprecation"})
-public class ReusingHashMatchIteratorITCase {
-
- private static final int MEMORY_SIZE = 16000000; // total memory
-
- private static final int INPUT_1_SIZE = 20000;
- private static final int INPUT_2_SIZE = 1000;
-
- private static final long SEED1 = 561349061987311L;
- private static final long SEED2 = 231434613412342L;
-
- private final AbstractInvokable parentTask = new DummyInvokable();
-
- private IOManager ioManager;
- private MemoryManager memoryManager;
-
- private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
- private TypeComparator<Tuple2<Integer, String>> record1Comparator;
- private TypeComparator<Tuple2<Integer, String>> record2Comparator;
- private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
-
- private TypeSerializer<IntPair> pairSerializer;
- private TypeComparator<IntPair> pairComparator;
- private TypePairComparator<IntPair, Tuple2<Integer, String>> pairRecordPairComparator;
- private TypePairComparator<Tuple2<Integer, String>, IntPair> recordPairPairComparator;
-
-
- @SuppressWarnings("unchecked")
- @Before
- public void beforeTest() {
- this.recordSerializer = TestData.getIntStringTupleSerializer();
-
- this.record1Comparator = TestData.getIntStringTupleComparator();
- this.record2Comparator = TestData.getIntStringTupleComparator();
-
- this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator);
-
- this.pairSerializer = new IntPairSerializer();
- this.pairComparator = new TestData.IntPairComparator();
- this.pairRecordPairComparator = new IntPairTuplePairComparator();
- this.recordPairPairComparator = new TupleIntPairPairComparator();
-
- this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
- this.ioManager = new IOManagerAsync();
- }
-
- @After
- public void afterTest() {
- if (this.ioManager != null) {
- this.ioManager.shutdown();
- if (!this.ioManager.isProperlyShutDown()) {
- Assert.fail("I/O manager failed to properly shut down.");
- }
- this.ioManager = null;
- }
-
- if (this.memoryManager != null) {
- Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.",
- this.memoryManager.verifyEmpty());
- this.memoryManager.shutdown();
- this.memoryManager = null;
- }
- }
-
-
- @Test
- public void testBuildFirst() {
- try {
- TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-
- final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
- // collect expected data
- final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
- collectTupleData(input1),
- collectTupleData(input2));
-
- final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
- final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
-
- // reset the generators
- generator1.reset();
- generator2.reset();
- input1.reset();
- input2.reset();
-
- // compare with iterator values
- ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new ReusingBuildFirstHashMatchIterator<>(
- input1, input2, this.recordSerializer, this.record1Comparator,
- this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0, true);
-
- iterator.open();
-
- while (iterator.callWithNextKey(matcher, collector));
-
- iterator.close();
-
- // assert that each expected match was seen
- for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
- if (!entry.getValue().isEmpty()) {
- Assert.fail("Collection for key " + entry.getKey() + " is not empty");
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- @Test
- public void testBuildFirstWithHighNumberOfCommonKeys()
- {
- // the size of the left and right inputs
- final int INPUT_1_SIZE = 200;
- final int INPUT_2_SIZE = 100;
-
- final int INPUT_1_DUPLICATES = 10;
- final int INPUT_2_DUPLICATES = 2000;
- final int DUPLICATE_KEY = 13;
-
- try {
- TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-
- final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
- final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
- final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
-
- final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
- inList1.add(gen1Iter);
- inList1.add(const1Iter);
-
- final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
- inList2.add(gen2Iter);
- inList2.add(const2Iter);
-
- MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
- MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
-
-
- // collect expected data
- final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
- collectTupleData(input1),
- collectTupleData(input2));
-
- // re-create the whole thing for actual processing
-
- // reset the generators and iterators
- generator1.reset();
- generator2.reset();
- const1Iter.reset();
- const2Iter.reset();
- gen1Iter.reset();
- gen2Iter.reset();
-
- inList1.clear();
- inList1.add(gen1Iter);
- inList1.add(const1Iter);
-
- inList2.clear();
- inList2.add(gen2Iter);
- inList2.add(const2Iter);
-
- input1 = new UnionIterator<Tuple2<Integer, String>>(inList1);
- input2 = new UnionIterator<Tuple2<Integer, String>>(inList2);
-
- final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
- final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
-
- ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new ReusingBuildFirstHashMatchIterator<>(
- input1, input2, this.recordSerializer, this.record1Comparator,
- this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0, true);
-
- iterator.open();
-
- while (iterator.callWithNextKey(matcher, collector));
-
- iterator.close();
-
- // assert that each expected match was seen
- for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
- if (!entry.getValue().isEmpty()) {
- Assert.fail("Collection for key " + entry.getKey() + " is not empty");
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- @Test
- public void testBuildSecond() {
- try {
- TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-
- final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
- // collect expected data
- final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
- collectTupleData(input1),
- collectTupleData(input2));
-
- final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
- final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
-
- // reset the generators
- generator1.reset();
- generator2.reset();
- input1.reset();
- input2.reset();
-
- // compare with iterator values
- ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>,Tuple2<Integer, String> ,Tuple2<Integer, String> > iterator =
- new ReusingBuildSecondHashMatchIterator<>(
- input1, input2, this.recordSerializer, this.record1Comparator,
- this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0, true);
-
- iterator.open();
-
- while (iterator.callWithNextKey(matcher, collector));
-
- iterator.close();
-
- // assert that each expected match was seen
- for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
- if (!entry.getValue().isEmpty()) {
- Assert.fail("Collection for key " + entry.getKey() + " is not empty");
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- @Test
- public void testBuildSecondWithHighNumberOfCommonKeys()
- {
- // the size of the left and right inputs
- final int INPUT_1_SIZE = 200;
- final int INPUT_2_SIZE = 100;
-
- final int INPUT_1_DUPLICATES = 10;
- final int INPUT_2_DUPLICATES = 2000;
- final int DUPLICATE_KEY = 13;
-
- try {
- TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
-
- final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
- final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
- final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
-
- final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
- inList1.add(gen1Iter);
- inList1.add(const1Iter);
-
- final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
- inList2.add(gen2Iter);
- inList2.add(const2Iter);
-
- MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
- MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
-
-
- // collect expected data
- final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
- collectTupleData(input1),
- collectTupleData(input2));
-
- // re-create the whole thing for actual processing
-
- // reset the generators and iterators
- generator1.reset();
- generator2.reset();
- const1Iter.reset();
- const2Iter.reset();
- gen1Iter.reset();
- gen2Iter.reset();
-
- inList1.clear();
- inList1.add(gen1Iter);
- inList1.add(const1Iter);
-
- inList2.clear();
- inList2.add(gen2Iter);
- inList2.add(const2Iter);
-
- input1 = new UnionIterator<>(inList1);
- input2 = new UnionIterator<>(inList2);
-
- final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
- final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
-
- ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new ReusingBuildSecondHashMatchIterator<>(
- input1, input2, this.recordSerializer, this.record1Comparator,
- this.recordSerializer, this.record2Comparator, this.recordPairComparator,
- this.memoryManager, ioManager, this.parentTask, 1.0, true);
-
- iterator.open();
-
- while (iterator.callWithNextKey(matcher, collector));
-
- iterator.close();
-
- // assert that each expected match was seen
- for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
- if (!entry.getValue().isEmpty()) {
- Assert.fail("Collection for key " + entry.getKey() + " is not empty");
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- @Test
- public void testBuildFirstWithMixedDataTypes() {
- try {
- MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
-
- final TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
- // collect expected data
- final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
- collectIntPairData(input1),
- collectTupleData(input2));
-
- final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
- final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
-
- // reset the generators
- input1 = new UniformIntPairGenerator(500, 40, false);
- generator2.reset();
- input2.reset();
-
- // compare with iterator values
- ReusingBuildSecondHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new ReusingBuildSecondHashMatchIterator<>(
- input1, input2, this.pairSerializer, this.pairComparator,
- this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
- this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
-
- iterator.open();
-
- while (iterator.callWithNextKey(matcher, collector));
-
- iterator.close();
-
- // assert that each expected match was seen
- for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
- if (!entry.getValue().isEmpty()) {
- Assert.fail("Collection for key " + entry.getKey() + " is not empty");
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- @Test
- public void testBuildSecondWithMixedDataTypes() {
- try {
- MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
-
- final TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
-
- // collect expected data
- final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
- collectIntPairData(input1),
- collectTupleData(input2));
-
- final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
- final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
-
- // reset the generators
- input1 = new UniformIntPairGenerator(500, 40, false);
- generator2.reset();
- input2.reset();
-
- // compare with iterator values
- ReusingBuildFirstHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
- new ReusingBuildFirstHashMatchIterator<>(
- input1, input2, this.pairSerializer, this.pairComparator,
- this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
- this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
-
- iterator.open();
-
- while (iterator.callWithNextKey(matcher, collector));
-
- iterator.close();
-
- // assert that each expected match was seen
- for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
- if (!entry.getValue().isEmpty()) {
- Assert.fail("Collection for key " + entry.getKey() + " is not empty");
- }
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail("An exception occurred during the test: " + e.getMessage());
- }
- }
-
- // --------------------------------------------------------------------------------------------
- // Utilities
- // --------------------------------------------------------------------------------------------
-
-
-
- static Map<Integer, Collection<TupleMatch>> matchSecondTupleFields(
- Map<Integer, Collection<String>> leftMap,
- Map<Integer, Collection<String>> rightMap)
- {
- Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
-
- for (Integer key : leftMap.keySet()) {
- Collection<String> leftValues = leftMap.get(key);
- Collection<String> rightValues = rightMap.get(key);
-
- if (rightValues == null) {
- continue;
- }
-
- if (!map.containsKey(key)) {
- map.put(key, new ArrayList<TupleMatch>());
- }
-
- Collection<TupleMatch> matchedValues = map.get(key);
-
- for (String leftValue : leftValues) {
- for (String rightValue : rightValues) {
- matchedValues.add(new TupleMatch(leftValue, rightValue));
- }
- }
- }
-
- return map;
- }
-
- static Map<Integer, Collection<TupleIntPairMatch>> matchTupleIntPairValues(
- Map<Integer, Collection<Integer>> leftMap,
- Map<Integer, Collection<String>> rightMap)
- {
- final Map<Integer, Collection<TupleIntPairMatch>> map = new HashMap<>();
-
- for (Integer i : leftMap.keySet()) {
-
- final Integer key = new Integer(i.intValue());
-
- final Collection<Integer> leftValues = leftMap.get(i);
- final Collection<String> rightValues = rightMap.get(key);
-
- if (rightValues == null) {
- continue;
- }
-
- if (!map.containsKey(key)) {
- map.put(key, new ArrayList<TupleIntPairMatch>());
- }
-
- final Collection<TupleIntPairMatch> matchedValues = map.get(key);
-
- for (Integer v : leftValues) {
- for (String val : rightValues) {
- matchedValues.add(new TupleIntPairMatch(v, val));
- }
- }
- }
-
- return map;
- }
-
-
- static Map<Integer, Collection<String>> collectTupleData(MutableObjectIterator<Tuple2<Integer, String>> iter)
- throws Exception
- {
- Map<Integer, Collection<String>> map = new HashMap<>();
- Tuple2<Integer, String> pair = new Tuple2<>();
-
- while ((pair = iter.next(pair)) != null) {
-
- Integer key = pair.f0;
- if (!map.containsKey(key)) {
- map.put(key, new ArrayList<String>());
- }
-
- Collection<String> values = map.get(key);
- values.add(pair.f1);
- }
-
- return map;
- }
-
- static Map<Integer, Collection<Integer>> collectIntPairData(MutableObjectIterator<IntPair> iter)
- throws Exception
- {
- Map<Integer, Collection<Integer>> map = new HashMap<>();
- IntPair pair = new IntPair();
-
- while ((pair = iter.next(pair)) != null) {
-
- final int key = pair.getKey();
- final int value = pair.getValue();
- if (!map.containsKey(key)) {
- map.put(key, new ArrayList<Integer>());
- }
-
- Collection<Integer> values = map.get(key);
- values.add(value);
- }
-
- return map;
- }
-
- /**
- * Private class used for storage of the expected matches in a hash-map.
- */
- static class TupleMatch {
-
- private final String left;
- private final String right;
-
- public TupleMatch(String left, String right) {
- this.left = left;
- this.right = right;
- }
-
- @Override
- public boolean equals(Object obj) {
- TupleMatch o = (TupleMatch) obj;
- return this.left.equals(o.left) && this.right.equals(o.right);
- }
-
- @Override
- public int hashCode() {
- return this.left.hashCode() ^ this.right.hashCode();
- }
-
- @Override
- public String toString() {
- return left + ", " + right;
- }
- }
-
- /**
- * Private class used for storage of the expected matches in a hash-map.
- */
- static class TupleIntPairMatch
- {
- private final int left;
- private final String right;
-
- public TupleIntPairMatch(int left, String right) {
- this.left = left;
- this.right = right;
- }
-
- @Override
- public boolean equals(Object obj) {
- TupleIntPairMatch o = (TupleIntPairMatch) obj;
- return this.left == o.left && this.right.equals(o.right);
- }
-
- @Override
- public int hashCode() {
- return this.left ^ this.right.hashCode();
- }
-
- @Override
- public String toString() {
- return left + ", " + right;
- }
- }
-
- static final class TupleMatchRemovingJoin implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>
- {
- private final Map<Integer, Collection<TupleMatch>> toRemoveFrom;
-
- protected TupleMatchRemovingJoin(Map<Integer, Collection<TupleMatch>> map) {
- this.toRemoveFrom = map;
- }
-
- @Override
- public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
- {
- Integer key = rec1.f0;
- String value1 = rec1.f1;
- String value2 = rec2.f1;
- //System.err.println("rec1 key = "+key+" rec2 key= "+rec2.getField(0, TestData.Key.class));
- Collection<TupleMatch> matches = this.toRemoveFrom.get(key);
- if (matches == null) {
- Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
- }
-
- Assert.assertTrue("Produced match was not contained: " + key + " - " + value1 + ":" + value2,
- matches.remove(new TupleMatch(value1, value2)));
-
- if (matches.isEmpty()) {
- this.toRemoveFrom.remove(key);
- }
- }
- }
-
- static final class TupleIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>>
- {
- private final Map<Integer, Collection<TupleIntPairMatch>> toRemoveFrom;
-
- protected TupleIntPairMatchRemovingMatcher(Map<Integer, Collection<TupleIntPairMatch>> map) {
- this.toRemoveFrom = map;
- }
-
- @Override
- public void join(IntPair rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
- {
- final int k = rec1.getKey();
- final int v = rec1.getValue();
-
- final Integer key = rec2.f0;
- final String value = rec2.f1;
-
- Assert.assertTrue("Key does not match for matching IntPair Tuple combination.", k == key);
-
- Collection<TupleIntPairMatch> matches = this.toRemoveFrom.get(key);
- if (matches == null) {
- Assert.fail("Match " + key + " - " + v + ":" + value + " is unexpected.");
- }
-
- Assert.assertTrue("Produced match was not contained: " + key + " - " + v + ":" + value,
- matches.remove(new TupleIntPairMatch(v, value)));
-
- if (matches.isEmpty()) {
- this.toRemoveFrom.remove(key);
- }
- }
- }
-
- static final class IntPairTuplePairComparator extends TypePairComparator<IntPair, Tuple2<Integer, String>>
- {
- private int reference;
-
- @Override
- public void setReference(IntPair reference) {
- this.reference = reference.getKey();
- }
-
- @Override
- public boolean equalToReference(Tuple2<Integer, String> candidate) {
- try {
- return candidate.f0 == this.reference;
- } catch (NullPointerException npex) {
- throw new NullKeyFieldException();
- }
- }
-
- @Override
- public int compareToReference(Tuple2<Integer, String> candidate) {
- try {
- return candidate.f0 - this.reference;
- } catch (NullPointerException npex) {
- throw new NullKeyFieldException();
- }
- }
- }
-
- static final class TupleIntPairPairComparator extends TypePairComparator<Tuple2<Integer, String>, IntPair>
- {
- private int reference;
-
- @Override
- public void setReference(Tuple2<Integer, String> reference) {
- this.reference = reference.f0;
- }
-
- @Override
- public boolean equalToReference(IntPair candidate) {
- return this.reference == candidate.getKey();
- }
-
- @Override
- public int compareToReference(IntPair candidate) {
- return candidate.getKey() - this.reference;
- }
- }
-}