You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/15 15:07:21 UTC
[2/6] flink git commit: [FLINK-2479] Refactor runtime.operators.*
tests
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
index 2427edd..d12307a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
@@ -18,13 +18,24 @@
package org.apache.flink.runtime.operators.testutils;
-import java.util.Comparator;
+import java.io.IOException;
import java.util.Random;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.operators.testutils.types.IntPair;
import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
import org.apache.flink.util.MutableObjectIterator;
/**
@@ -38,250 +49,6 @@ public final class TestData {
private TestData() {}
/**
- * Key comparator.
- */
- public static class KeyComparator implements Comparator<Key> {
- @Override
- public int compare(Key k1, Key k2) {
- return k1.compareTo(k2);
- }
- };
-
- /**
- * Key implementation.
- */
- public static class Key extends IntValue {
- private static final long serialVersionUID = 1L;
-
- public Key() {
- super();
- }
-
- public Key(int k) {
- super(k);
- }
-
- public int getKey() {
- return getValue();
- }
-
- public void setKey(int key) {
- setValue(key);
- }
- }
-
- /**
- * Value implementation.
- */
- public static class Value extends StringValue {
-
- private static final long serialVersionUID = 1L;
-
- public Value() {
- super();
- }
-
- public Value(String v) {
- super(v);
- }
-
- @Override
- public boolean equals(final Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (obj.getClass() == TestData.Value.class) {
- final StringValue other = (StringValue) obj;
- int len = this.length();
-
- if (len == other.length()) {
- final char[] tc = this.getCharArray();
- final char[] oc = other.getCharArray();
- int i = 0, j = 0;
-
- while (len-- != 0) {
- if (tc[i++] != oc[j++]) {
- return false;
- }
- }
- return true;
- }
- }
- return false;
- }
- }
-
- /**
- * Pair generator.
- */
- public static class Generator implements MutableObjectIterator<Record> {
-
- public enum KeyMode {
- SORTED, RANDOM
- };
-
- public enum ValueMode {
- FIX_LENGTH, RANDOM_LENGTH, CONSTANT
- };
-
- private static char[] alpha = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'a', 'b', 'c',
- 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm' };
-
- private final long seed;
-
- private final int keyMax;
-
- private final int valueLength;
-
- private final KeyMode keyMode;
-
- private final ValueMode valueMode;
-
- private Random random;
-
- private int counter;
-
- private Key key;
- private Value value;
-
- public Generator(long seed, int keyMax, int valueLength) {
- this(seed, keyMax, valueLength, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
- }
-
- public Generator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode) {
- this(seed, keyMax, valueLength, keyMode, valueMode, null);
- }
-
- public Generator(long seed, int keyMax, int valueLength, KeyMode keyMode, ValueMode valueMode, Value constant) {
- this.seed = seed;
- this.keyMax = keyMax;
- this.valueLength = valueLength;
- this.keyMode = keyMode;
- this.valueMode = valueMode;
-
- this.random = new Random(seed);
- this.counter = 0;
-
- this.key = new Key();
- this.value = constant == null ? new Value() : constant;
- }
-
- public Record next(Record reuse) {
- this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1);
- if (this.valueMode != ValueMode.CONSTANT) {
- this.value.setValue(randomString());
- }
- reuse.setField(0, this.key);
- reuse.setField(1, this.value);
- return reuse;
- }
-
- public Record next() {
- return next(new Record(2));
- }
-
- public boolean next(org.apache.flink.types.Value[] target) {
- this.key.setKey(keyMode == KeyMode.SORTED ? ++counter : Math.abs(random.nextInt() % keyMax) + 1);
- // TODO change this to something proper
- ((IntValue)target[0]).setValue(this.key.getValue());
- ((IntValue)target[1]).setValue(random.nextInt());
- return true;
- }
-
- public int sizeOf(Record rec) {
- // key
- int valueLength = Integer.SIZE / 8;
-
- // value
- String text = rec.getField(1, Value.class).getValue();
- int strlen = text.length();
- int utflen = 0;
- int c;
- for (int i = 0; i < strlen; i++) {
- c = text.charAt(i);
- if ((c >= 0x0001) && (c <= 0x007F)) {
- utflen++;
- } else if (c > 0x07FF) {
- utflen += 3;
- } else {
- utflen += 2;
- }
- }
- valueLength += 2 + utflen;
-
- return valueLength;
- }
-
- public void reset() {
- this.random = new Random(seed);
- this.counter = 0;
- }
-
- private String randomString() {
- int length;
-
- if (valueMode == ValueMode.FIX_LENGTH) {
- length = valueLength;
- } else {
- length = valueLength - random.nextInt(valueLength / 3);
- }
-
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < length; i++) {
- sb.append(alpha[random.nextInt(alpha.length)]);
- }
- return sb.toString();
- }
-
- }
-
- /**
- * Record reader mock.
- */
- public static class GeneratorIterator implements MutableObjectIterator<Record> {
-
- private final Generator generator;
-
- private final int numberOfRecords;
-
- private int counter;
-
- public GeneratorIterator(Generator generator, int numberOfRecords) {
- this.generator = generator;
- this.generator.reset();
- this.numberOfRecords = numberOfRecords;
- this.counter = 0;
- }
-
- @Override
- public Record next(Record target) {
- if (counter < numberOfRecords) {
- counter++;
- return generator.next(target);
- }
- else {
- return null;
- }
- }
-
- @Override
- public Record next() {
- if (counter < numberOfRecords) {
- counter++;
- return generator.next();
- }
- else {
- return null;
- }
- }
-
- public void reset() {
- this.counter = 0;
- }
- }
-
- /**
* Tuple2<Integer, String> generator.
*/
public static class TupleGenerator implements MutableObjectIterator<Tuple2<Integer, String>> {
@@ -398,9 +165,8 @@ public final class TestData {
}
-
/**
- * Record reader mock.
+ * Tuple reader mock.
*/
public static class TupleGeneratorIterator implements MutableObjectIterator<Tuple2<Integer, String>> {
@@ -443,35 +209,31 @@ public final class TestData {
this.counter = 0;
}
}
-
- // --------------------------------------------------------------------------------------------
-
- public static class ConstantValueIterator implements MutableObjectIterator<Record> {
-
- private final Key key;
- private final Value value;
-
+
+ public static class TupleConstantValueIterator implements MutableObjectIterator<Tuple2<Integer, String>> {
+
+ private int key;
+ private String value;
+
private final String valueValue;
-
-
+
+
private final int numPairs;
-
+
private int pos;
-
-
- public ConstantValueIterator(int keyValue, String valueValue, int numPairs) {
- this.key = new Key(keyValue);
- this.value = new Value();
+
+
+ public TupleConstantValueIterator(int keyValue, String valueValue, int numPairs) {
+ this.key = keyValue;
this.valueValue = valueValue;
this.numPairs = numPairs;
}
-
+
@Override
- public Record next(Record reuse) {
+ public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) {
if (pos < this.numPairs) {
- this.value.setValue(this.valueValue + ' ' + pos);
- reuse.setField(0, this.key);
- reuse.setField(1, this.value);
+ this.value = this.valueValue + ' ' + pos;
+ reuse.setFields(this.key, this.value);
pos++;
return reuse;
}
@@ -481,8 +243,8 @@ public final class TestData {
}
@Override
- public Record next() {
- return next(new Record(2));
+ public Tuple2<Integer, String> next() {
+ return next(new Tuple2<Integer, String>());
}
public void reset() {
@@ -490,45 +252,307 @@ public final class TestData {
}
}
- public static class TupleConstantValueIterator implements MutableObjectIterator<Tuple2<Integer, String>> {
+ /**
+ * An iterator that returns the Key/Value pairs with identical value a given number of times.
+ */
+ public static final class ConstantIntIntTuplesIterator implements MutableObjectIterator<Tuple2<Integer, Integer>> {
- private int key;
- private String value;
+ private final int key;
+ private final int value;
- private final String valueValue;
+ private int numLeft;
+ public ConstantIntIntTuplesIterator(int key, int value, int count) {
+ this.key = key;
+ this.value = value;
+ this.numLeft = count;
+ }
- private final int numPairs;
+ @Override
+ public Tuple2<Integer, Integer> next(Tuple2<Integer, Integer> reuse) {
+ if (this.numLeft > 0) {
+ this.numLeft--;
+ reuse.setField(this.key, 0);
+ reuse.setField(this.value, 1);
+ return reuse;
+ } else {
+ return null;
+ }
+ }
- private int pos;
+ @Override
+ public Tuple2<Integer, Integer> next() {
+ return next(new Tuple2<>(0, 0));
+ }
+ }
+ //----Tuple2<Integer, String>
+ private static final TupleTypeInfo<Tuple2<Integer, String>> typeInfoIntString = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, String.class);
- public TupleConstantValueIterator(int keyValue, String valueValue, int numPairs) {
- this.key = keyValue;
- this.valueValue = valueValue;
- this.numPairs = numPairs;
+ private static final TypeSerializerFactory<Tuple2<Integer, String>> serializerFactoryIntString = new MockTupleSerializerFactory(typeInfoIntString);
+
+ public static TupleTypeInfo<Tuple2<Integer, String>> getIntStringTupleTypeInfo() {
+ return typeInfoIntString;
+ }
+
+ public static TypeSerializerFactory<Tuple2<Integer, String>> getIntStringTupleSerializerFactory() {
+ return serializerFactoryIntString;
+ }
+
+ public static TypeSerializer<Tuple2<Integer, String>> getIntStringTupleSerializer() {
+ return serializerFactoryIntString.getSerializer();
+ }
+
+ public static TypeComparator<Tuple2<Integer, String>> getIntStringTupleComparator() {
+ return getIntStringTupleTypeInfo().createComparator(new int[]{0}, new boolean[]{true}, 0, null);
+ }
+
+ public static MockTuple2Reader<Tuple2<Integer, String>> getIntStringTupleReader() {
+ return new MockTuple2Reader<Tuple2<Integer, String>>();
+ }
+
+ //----Tuple2<Integer, Integer>
+ private static final TupleTypeInfo<Tuple2<Integer, Integer>> typeInfoIntInt = TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class);
+
+ private static final TypeSerializerFactory<Tuple2<Integer, Integer>> serializerFactoryIntInt = new MockTupleSerializerFactory(typeInfoIntInt);
+
+ public static TupleTypeInfo<Tuple2<Integer, Integer>> getIntIntTupleTypeInfo() {
+ return typeInfoIntInt;
+ }
+
+ public static TypeSerializerFactory<Tuple2<Integer, Integer>> getIntIntTupleSerializerFactory() {
+ return serializerFactoryIntInt;
+ }
+
+ public static TypeSerializer<Tuple2<Integer, Integer>> getIntIntTupleSerializer() {
+ return getIntIntTupleSerializerFactory().getSerializer();
+ }
+
+ public static TypeComparator<Tuple2<Integer, Integer>> getIntIntTupleComparator() {
+ return getIntIntTupleTypeInfo().createComparator(new int[]{0}, new boolean[]{true}, 0, null);
+ }
+
+ public static MockTuple2Reader<Tuple2<Integer, Integer>> getIntIntTupleReader() {
+ return new MockTuple2Reader<>();
+ }
+
+ //----Tuple2<?, ?>
+ private static class MockTupleSerializerFactory<T extends Tuple> implements TypeSerializerFactory<T> {
+ private final TupleTypeInfo<T> info;
+
+ public MockTupleSerializerFactory(TupleTypeInfo<T> info) {
+ this.info = info;
}
@Override
- public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) {
- if (pos < this.numPairs) {
- this.value = this.valueValue + ' ' + pos;
- reuse.setFields(this.key, this.value);
- pos++;
+ public void writeParametersToConfig(Configuration config) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public TypeSerializer<T> getSerializer() {
+ return info.createSerializer(null);
+ }
+
+ @Override
+ public Class<T> getDataType() {
+ return info.getTypeClass();
+ }
+ }
+
+ public static class MockTuple2Reader<T extends Tuple2> implements MutableObjectIterator<T> {
+ private final Tuple2 SENTINEL = new Tuple2();
+
+ private final BlockingQueue<Tuple2> queue;
+
+ public MockTuple2Reader() {
+ this.queue = new ArrayBlockingQueue<Tuple2>(32, false);
+ }
+
+ public MockTuple2Reader(int size) {
+ this.queue = new ArrayBlockingQueue<Tuple2>(size, false);
+ }
+
+ @Override
+ public T next(T reuse) {
+ Tuple2 r = null;
+ while (r == null) {
+ try {
+ r = queue.take();
+ } catch (InterruptedException iex) {
+ throw new RuntimeException("Reader was interrupted.");
+ }
+ }
+
+ if (r.equals(SENTINEL)) {
+ // put the sentinel back, to ensure that repeated calls do not block
+ try {
+ queue.put(r);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Reader was interrupted.");
+ }
+ return null;
+ } else {
+ reuse.setField(r.getField(0), 0);
+ reuse.setField(r.getField(1), 1);
return reuse;
}
- else {
+ }
+
+ @Override
+ public T next() {
+ Tuple2 r = null;
+ while (r == null) {
+ try {
+ r = queue.take();
+ } catch (InterruptedException iex) {
+ throw new RuntimeException("Reader was interrupted.");
+ }
+ }
+
+ if (r.equals(SENTINEL)) {
+ // put the sentinel back, to ensure that repeated calls do not block
+ try {
+ queue.put(r);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Reader was interrupted.");
+ }
return null;
+ } else {
+ Tuple2 result = new Tuple2(r.f0, r.f1);
+ return (T) result;
+ }
+ }
+
+ public void emit(Tuple2 element) throws InterruptedException {
+ queue.put(new Tuple2(element.f0, element.f1));
+ }
+
+ public void close() {
+ try {
+ queue.put(SENTINEL);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
}
+ }
+
+ public static class IntPairComparator extends TypeComparator<IntPair> {
+
+ private static final long serialVersionUID = 1L;
+
+ private int reference;
+
+ private final TypeComparator[] comparators = new TypeComparator[]{new IntComparator(true)};
@Override
- public Tuple2<Integer, String> next() {
- return next(new Tuple2<Integer, String>());
+ public int hash(IntPair object) {
+ return comparators[0].hash(object.getKey());
}
- public void reset() {
- this.pos = 0;
+ @Override
+ public void setReference(IntPair toCompare) {
+ this.reference = toCompare.getKey();
+ }
+
+ @Override
+ public boolean equalToReference(IntPair candidate) {
+ return candidate.getKey() == this.reference;
+ }
+
+ @Override
+ public int compareToReference(TypeComparator<IntPair> referencedAccessors) {
+ final IntPairComparator comp = (IntPairComparator) referencedAccessors;
+ return comp.reference - this.reference;
+ }
+
+ @Override
+ public int compare(IntPair first, IntPair second) {
+ return first.getKey() - second.getKey();
+ }
+
+ @Override
+ public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
+ return source1.readInt() - source2.readInt();
+ }
+
+ @Override
+ public boolean supportsNormalizedKey() {
+ return true;
+ }
+
+ @Override
+ public int getNormalizeKeyLen() {
+ return 4;
+ }
+
+ @Override
+ public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
+ return keyBytes < 4;
+ }
+
+ @Override
+ public void putNormalizedKey(IntPair record, MemorySegment target, int offset, int len) {
+ // see IntValue for a documentation of the logic
+ final int value = record.getKey() - Integer.MIN_VALUE;
+
+ if (len == 4) {
+ target.putIntBigEndian(offset, value);
+ } else if (len <= 0) {
+ } else if (len < 4) {
+ for (int i = 0; len > 0; len--, i++) {
+ target.put(offset + i, (byte) ((value >>> ((3 - i) << 3)) & 0xff));
+ }
+ } else {
+ target.putIntBigEndian(offset, value);
+ for (int i = 4; i < len; i++) {
+ target.put(offset + i, (byte) 0);
+ }
+ }
+ }
+
+ @Override
+ public boolean invertNormalizedKey() {
+ return false;
+ }
+
+ @Override
+ public IntPairComparator duplicate() {
+ return new IntPairComparator();
+ }
+
+ @Override
+ public int extractKeys(Object record, Object[] target, int index) {
+ target[index] = ((IntPair) record).getKey();
+ return 1;
+ }
+
+ @Override
+ public TypeComparator[] getFlatComparators() {
+ return comparators;
+ }
+
+ @Override
+ public boolean supportsSerializationWithKeyNormalization() {
+ return true;
+ }
+
+ @Override
+ public void writeWithKeyNormalization(IntPair record, DataOutputView target) throws IOException {
+ target.writeInt(record.getKey() - Integer.MIN_VALUE);
+ target.writeInt(record.getValue());
+ }
+
+ @Override
+ public IntPair readWithKeyDenormalization(IntPair reuse, DataInputView source) throws IOException {
+ reuse.setKey(source.readInt() + Integer.MIN_VALUE);
+ reuse.setValue(source.readInt());
+ return reuse;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
index f112ff8..4c5a07e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java
@@ -18,13 +18,12 @@
package org.apache.flink.runtime.operators.util;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.functions.JoinFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -37,10 +36,8 @@ import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
@@ -78,21 +75,21 @@ public class HashVsSortMiniBenchmark {
private IOManager ioManager;
private MemoryManager memoryManager;
- private TypeSerializerFactory<Record> serializer1;
- private TypeSerializerFactory<Record> serializer2;
- private TypeComparator<Record> comparator1;
- private TypeComparator<Record> comparator2;
- private TypePairComparator<Record, Record> pairComparator11;
+ private TypeSerializerFactory<Tuple2<Integer, String>> serializer1;
+ private TypeSerializerFactory<Tuple2<Integer, String>> serializer2;
+ private TypeComparator<Tuple2<Integer, String>> comparator1;
+ private TypeComparator<Tuple2<Integer, String>> comparator2;
+ private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator11;
@SuppressWarnings("unchecked")
@Before
public void beforeTest() {
- this.serializer1 = RecordSerializerFactory.get();
- this.serializer2 = RecordSerializerFactory.get();
- this.comparator1 = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
- this.comparator2 = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
- this.pairComparator11 = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+ this.serializer1 = TestData.getIntStringTupleSerializerFactory();
+ this.serializer2 = TestData.getIntStringTupleSerializerFactory();
+ this.comparator1 = TestData.getIntStringTupleComparator();
+ this.comparator2 = TestData.getIntStringTupleComparator();
+ this.pairComparator11 = new GenericPairComparator(this.comparator1, this.comparator2);
this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true);
this.ioManager = new IOManagerAsync();
@@ -120,31 +117,31 @@ public class HashVsSortMiniBenchmark {
public void testSortBothMerge() {
try {
- Generator generator1 = new Generator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- Generator generator2 = new Generator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
- final JoinFunction matcher = new NoOpMatcher();
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final FlatJoinFunction matcher = new NoOpMatcher();
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
long start = System.nanoTime();
- final UnilateralSortMerger<Record> sorter1 = new UnilateralSortMerger<Record>(
+ final UnilateralSortMerger<Tuple2<Integer, String>> sorter1 = new UnilateralSortMerger<>(
this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1,
this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
- final UnilateralSortMerger<Record> sorter2 = new UnilateralSortMerger<Record>(
+ final UnilateralSortMerger<Tuple2<Integer, String>> sorter2 = new UnilateralSortMerger<>(
this.memoryManager, this.ioManager, input2, this.parentTask, this.serializer2,
this.comparator2.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true);
- final MutableObjectIterator<Record> sortedInput1 = sorter1.getIterator();
- final MutableObjectIterator<Record> sortedInput2 = sorter2.getIterator();
+ final MutableObjectIterator<Tuple2<Integer, String>> sortedInput1 = sorter1.getIterator();
+ final MutableObjectIterator<Tuple2<Integer, String>> sortedInput2 = sorter2.getIterator();
// compare with iterator values
- ReusingMergeInnerJoinIterator<Record, Record, Record> iterator =
- new ReusingMergeInnerJoinIterator<Record, Record, Record>(sortedInput1, sortedInput2,
+ ReusingMergeInnerJoinIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingMergeInnerJoinIterator<>(sortedInput1, sortedInput2,
this.serializer1.getSerializer(), this.comparator1, this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
this.memoryManager, this.ioManager, MEMORY_PAGES_FOR_MERGE, this.parentTask);
@@ -170,21 +167,21 @@ public class HashVsSortMiniBenchmark {
@Test
public void testBuildFirst() {
try {
- Generator generator1 = new Generator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- Generator generator2 = new Generator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
- final JoinFunction matcher = new NoOpMatcher();
+ final FlatJoinFunction matcher = new NoOpMatcher();
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
long start = System.nanoTime();
// compare with iterator values
- final ReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
- new ReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+ final ReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildFirstHashMatchIterator<>(
input1, input2, this.serializer1.getSerializer(), this.comparator1,
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
@@ -209,21 +206,21 @@ public class HashVsSortMiniBenchmark {
@Test
public void testBuildSecond() {
try {
- Generator generator1 = new Generator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- Generator generator2 = new Generator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, INPUT_1_SIZE / 10, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, INPUT_2_SIZE, 100, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
- final JoinFunction matcher = new NoOpMatcher();
+ final FlatJoinFunction matcher = new NoOpMatcher();
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
long start = System.nanoTime();
// compare with iterator values
- ReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
- new ReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+ ReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new ReusingBuildSecondHashMatchIterator<>(
input1, input2, this.serializer1.getSerializer(), this.comparator1,
this.serializer2.getSerializer(), this.comparator2, this.pairComparator11,
this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, true);
@@ -246,11 +243,11 @@ public class HashVsSortMiniBenchmark {
}
- private static final class NoOpMatcher extends JoinFunction {
+ private static final class NoOpMatcher implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> {
private static final long serialVersionUID = 1L;
@Override
- public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception {
+ public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception {
}
}
}