You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/15 15:07:22 UTC
[3/6] flink git commit: [FLINK-2479] Refactor runtime.operators.*
tests
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
index e1e2c0a..7b7b940 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.operators.sort;
import java.io.IOException;
-import java.util.Comparator;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.NoSuchElementException;
@@ -30,9 +29,9 @@ import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -40,12 +39,9 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
@@ -72,9 +68,11 @@ public class CombiningUnilateralSortMergerITCase {
private MemoryManager memoryManager;
- private TypeSerializerFactory<Record> serializerFactory;
+ private TypeSerializerFactory<Tuple2<Integer, String>> serializerFactory1;
+ private TypeSerializerFactory<Tuple2<Integer, Integer>> serializerFactory2;
- private TypeComparator<Record> comparator;
+ private TypeComparator<Tuple2<Integer, String>> comparator1;
+ private TypeComparator<Tuple2<Integer, Integer>> comparator2;
@SuppressWarnings("unchecked")
@Before
@@ -82,8 +80,11 @@ public class CombiningUnilateralSortMergerITCase {
this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
this.ioManager = new IOManagerAsync();
- this.serializerFactory = RecordSerializerFactory.get();
- this.comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+ this.serializerFactory1 = TestData.getIntStringTupleSerializerFactory();
+ this.comparator1 = TestData.getIntStringTupleComparator();
+
+ this.serializerFactory2 = TestData.getIntIntTupleSerializerFactory();
+ this.comparator2 = TestData.getIntIntTupleComparator();
}
@After
@@ -107,32 +108,30 @@ public class CombiningUnilateralSortMergerITCase {
int noKeys = 100;
int noKeyCnt = 10000;
- MockRecordReader reader = new MockRecordReader();
+ TestData.MockTuple2Reader<Tuple2<Integer, Integer>> reader = TestData.getIntIntTupleReader();
LOG.debug("initializing sortmerger");
TestCountCombiner comb = new TestCountCombiner();
- Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb,
- this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
+ Sorter<Tuple2<Integer, Integer>> merger = new CombiningUnilateralSortMerger<>(comb,
+ this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2,
0.25, 64, 0.7f, false);
- final Record rec = new Record();
- rec.setField(1, new IntValue(1));
- final TestData.Key key = new TestData.Key();
+ final Tuple2<Integer, Integer> rec = new Tuple2<>();
+ rec.setField(1, 1);
for (int i = 0; i < noKeyCnt; i++) {
for (int j = 0; j < noKeys; j++) {
- key.setKey(j);
- rec.setField(0, key);
+ rec.setField(j, 0);
reader.emit(rec);
}
}
reader.close();
- MutableObjectIterator<Record> iterator = merger.getIterator();
+ MutableObjectIterator<Tuple2<Integer, Integer>> iterator = merger.getIterator();
- Iterator<Integer> result = getReducingIterator(iterator, serializerFactory.getSerializer(), comparator.duplicate());
+ Iterator<Integer> result = getReducingIterator(iterator, serializerFactory2.getSerializer(), comparator2.duplicate());
while (result.hasNext()) {
Assert.assertEquals(noKeyCnt, result.next().intValue());
}
@@ -148,32 +147,30 @@ public class CombiningUnilateralSortMergerITCase {
int noKeys = 100;
int noKeyCnt = 10000;
- MockRecordReader reader = new MockRecordReader();
+ TestData.MockTuple2Reader<Tuple2<Integer, Integer>> reader = TestData.getIntIntTupleReader();
LOG.debug("initializing sortmerger");
TestCountCombiner comb = new TestCountCombiner();
- Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb,
- this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
+ Sorter<Tuple2<Integer, Integer>> merger = new CombiningUnilateralSortMerger<>(comb,
+ this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory2, this.comparator2,
0.01, 64, 0.005f, true);
- final Record rec = new Record();
- rec.setField(1, new IntValue(1));
- final TestData.Key key = new TestData.Key();
+ final Tuple2<Integer, Integer> rec = new Tuple2<>();
+ rec.setField(1, 1);
for (int i = 0; i < noKeyCnt; i++) {
for (int j = 0; j < noKeys; j++) {
- key.setKey(j);
- rec.setField(0, key);
+ rec.setField(j, 0);
reader.emit(rec);
}
}
reader.close();
- MutableObjectIterator<Record> iterator = merger.getIterator();
+ MutableObjectIterator<Tuple2<Integer, Integer>> iterator = merger.getIterator();
- Iterator<Integer> result = getReducingIterator(iterator, serializerFactory.getSerializer(), comparator.duplicate());
+ Iterator<Integer> result = getReducingIterator(iterator, serializerFactory2.getSerializer(), comparator2.duplicate());
while (result.hasNext()) {
Assert.assertEquals(noKeyCnt, result.next().intValue());
}
@@ -187,65 +184,60 @@ public class CombiningUnilateralSortMergerITCase {
@Test
public void testSortAndValidate() throws Exception
{
- final Hashtable<TestData.Key, Integer> countTable = new Hashtable<TestData.Key, Integer>(KEY_MAX);
+ final Hashtable<Integer, Integer> countTable = new Hashtable<>(KEY_MAX);
for (int i = 1; i <= KEY_MAX; i++) {
- countTable.put(new TestData.Key(i), 0);
+ countTable.put(i, 0);
}
// comparator
- final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+ final TypeComparator<Integer> keyComparator = new IntComparator(true);
// reader
- MockRecordReader reader = new MockRecordReader();
+ TestData.MockTuple2Reader<Tuple2<Integer, String>> reader = TestData.getIntStringTupleReader();
// merge iterator
LOG.debug("initializing sortmerger");
TestCountCombiner2 comb = new TestCountCombiner2();
- Sorter<Record> merger = new CombiningUnilateralSortMerger<Record>(comb,
- this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory, this.comparator,
+ Sorter<Tuple2<Integer, String>> merger = new CombiningUnilateralSortMerger<>(comb,
+ this.memoryManager, this.ioManager, reader, this.parentTask, this.serializerFactory1, this.comparator1,
0.25, 2, 0.7f, false);
// emit data
LOG.debug("emitting data");
- TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
- Record rec = new Record();
- final TestData.Value value = new TestData.Value("1");
+ TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+ Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS; i++) {
Assert.assertTrue((rec = generator.next(rec)) != null);
- final TestData.Key key = rec.getField(0, TestData.Key.class);
- rec.setField(1, value);
+ final Integer key = rec.f0;
+ rec.setField("1", 1);
reader.emit(rec);
- countTable.put(new TestData.Key(key.getKey()), countTable.get(key) + 1);
+ countTable.put(key, countTable.get(key) + 1);
}
reader.close();
- rec = null;
// check order
- MutableObjectIterator<Record> iterator = merger.getIterator();
+ MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
LOG.debug("checking results");
- Record rec1 = new Record();
- Record rec2 = new Record();
+ Tuple2<Integer, String> rec1 = new Tuple2<>();
+ Tuple2<Integer, String> rec2 = new Tuple2<>();
Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
- countTable.put(new TestData.Key(rec1.getField(0, TestData.Key.class).getKey()), countTable.get(rec1.getField(0, TestData.Key.class)) - (Integer.parseInt(rec1.getField(1, TestData.Value.class).toString())));
+ countTable.put(rec1.f0, countTable.get(rec1.f0) - (Integer.parseInt(rec1.f1)));
while ((rec2 = iterator.next(rec2)) != null) {
- final Key k1 = rec1.getField(0, TestData.Key.class);
- final Key k2 = rec2.getField(0, TestData.Key.class);
+ int k1 = rec1.f0;
+ int k2 = rec2.f0;
Assert.assertTrue(keyComparator.compare(k1, k2) <= 0);
- countTable.put(new TestData.Key(k2.getKey()), countTable.get(k2) - (Integer.parseInt(rec2.getField(1, TestData.Value.class).toString())));
+ countTable.put(k2, countTable.get(k2) - (Integer.parseInt(rec2.f1)));
- Record tmp = rec1;
rec1 = rec2;
- k1.setKey(k2.getKey());
- rec2 = tmp;
}
for (Integer cnt : countTable.values()) {
@@ -260,10 +252,10 @@ public class CombiningUnilateralSortMergerITCase {
// --------------------------------------------------------------------------------------------
- public static class TestCountCombiner extends RichGroupReduceFunction<Record, Record> {
+ public static class TestCountCombiner extends RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
private static final long serialVersionUID = 1L;
- private final IntValue count = new IntValue();
+ private Integer count = 0;
public volatile boolean opened = false;
@@ -271,21 +263,21 @@ public class CombiningUnilateralSortMergerITCase {
@Override
- public void combine(Iterable<Record> values, Collector<Record> out) {
- Record rec = null;
+ public void combine(Iterable<Tuple2<Integer, Integer>> values, Collector<Tuple2<Integer, Integer>> out) {
+ Tuple2<Integer, Integer> rec = new Tuple2<>();
int cnt = 0;
- for (Record next : values) {
+ for (Tuple2<Integer, Integer> next : values) {
rec = next;
- cnt += rec.getField(1, IntValue.class).getValue();
+ cnt += rec.f1;
}
- this.count.setValue(cnt);
- rec.setField(1, this.count);
+ this.count = cnt;
+ rec.setField(this.count, 1);
out.collect(rec);
}
@Override
- public void reduce(Iterable<Record> values, Collector<Record> out) {}
+ public void reduce(Iterable<Tuple2<Integer, Integer>> values, Collector<Tuple2<Integer, Integer>> out) {}
@Override
public void open(Configuration parameters) throws Exception {
@@ -298,7 +290,7 @@ public class CombiningUnilateralSortMergerITCase {
}
}
- public static class TestCountCombiner2 extends RichGroupReduceFunction<Record, Record> {
+ public static class TestCountCombiner2 extends RichGroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
private static final long serialVersionUID = 1L;
public volatile boolean opened = false;
@@ -306,19 +298,19 @@ public class CombiningUnilateralSortMergerITCase {
public volatile boolean closed = false;
@Override
- public void combine(Iterable<Record> values, Collector<Record> out) {
- Record rec = null;
+ public void combine(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) {
+ Tuple2<Integer, String> rec = new Tuple2<>();
int cnt = 0;
- for (Record next : values) {
+ for (Tuple2<Integer, String> next : values) {
rec = next;
- cnt += Integer.parseInt(rec.getField(1, TestData.Value.class).toString());
+ cnt += Integer.parseInt(rec.f1);
}
- out.collect(new Record(rec.getField(0, Key.class), new TestData.Value(cnt + "")));
+ out.collect(new Tuple2(rec.f0, cnt + ""));
}
@Override
- public void reduce(Iterable<Record> values, Collector<Record> out) {
+ public void reduce(Iterable<Tuple2<Integer, String>> values, Collector<Tuple2<Integer, String>> out) {
// yo, nothing, mon
}
@@ -333,9 +325,9 @@ public class CombiningUnilateralSortMergerITCase {
}
}
- private static Iterator<Integer> getReducingIterator(MutableObjectIterator<Record> data, TypeSerializer<Record> serializer, TypeComparator<Record> comparator) {
+ private static Iterator<Integer> getReducingIterator(MutableObjectIterator<Tuple2<Integer, Integer>> data, TypeSerializer<Tuple2<Integer, Integer>> serializer, TypeComparator<Tuple2<Integer, Integer>> comparator) {
- final ReusingKeyGroupedIterator<Record> groupIter = new ReusingKeyGroupedIterator<Record>(data, serializer, comparator);
+ final ReusingKeyGroupedIterator<Tuple2<Integer, Integer>> groupIter = new ReusingKeyGroupedIterator<> (data, serializer, comparator);
return new Iterator<Integer>() {
@@ -360,13 +352,13 @@ public class CombiningUnilateralSortMergerITCase {
if (hasNext()) {
hasNext = false;
- Iterator<Record> values = groupIter.getValues();
+ Iterator<Tuple2<Integer, Integer>> values = groupIter.getValues();
- Record rec = null;
+ Tuple2<Integer, Integer> rec;
int cnt = 0;
while (values.hasNext()) {
rec = values.next();
- cnt += rec.getField(1, IntValue.class).getValue();
+ cnt += rec.f1;
}
return cnt;
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
index 9f0b3d9..b19591b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortITCase.java
@@ -18,12 +18,11 @@
package org.apache.flink.runtime.operators.sort;
-import java.util.Comparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -31,14 +30,10 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.RandomIntPairGenerator;
import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
import org.apache.flink.runtime.operators.testutils.types.IntPair;
-import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
-import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
@@ -58,7 +53,7 @@ public class ExternalSortITCase {
private static final int VALUE_LENGTH = 114;
- private static final Value VAL = new Value("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ");
+ private static final String VAL = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
private static final int NUM_PAIRS = 200000;
@@ -70,9 +65,9 @@ public class ExternalSortITCase {
private MemoryManager memoryManager;
- private TypeSerializerFactory<Record> pactRecordSerializer;
+ private TypeSerializerFactory<Tuple2<Integer, String>> pactRecordSerializer;
- private TypeComparator<Record> pactRecordComparator;
+ private TypeComparator<Tuple2<Integer, String>> pactRecordComparator;
private boolean testSuccess;
@@ -84,8 +79,8 @@ public class ExternalSortITCase {
this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
this.ioManager = new IOManagerAsync();
- this.pactRecordSerializer = RecordSerializerFactory.get();
- this.pactRecordComparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+ this.pactRecordSerializer = TestData.getIntStringTupleSerializerFactory();
+ this.pactRecordComparator = TestData.getIntStringTupleComparator();
}
@After
@@ -109,15 +104,15 @@ public class ExternalSortITCase {
public void testInMemorySort() {
try {
// comparator
- final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+ final TypeComparator<Integer> keyComparator = new IntComparator(true);
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
- final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS);
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
+ final MutableObjectIterator<Tuple2<Integer, String>> source = new TestData.TupleGeneratorIterator(generator, NUM_PAIRS);
// merge iterator
LOG.debug("Initializing sortmerger...");
- Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager,
+ Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
(double)64/78, 2, 0.9f, true);
@@ -125,26 +120,22 @@ public class ExternalSortITCase {
LOG.debug("Reading and sorting data...");
// check order
- MutableObjectIterator<Record> iterator = merger.getIterator();
+ MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
LOG.debug("Checking results...");
int pairsEmitted = 1;
- Record rec1 = new Record();
- Record rec2 = new Record();
+ Tuple2<Integer, String> rec1 = new Tuple2<>();
+ Tuple2<Integer, String> rec2 = new Tuple2<>();
Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
while ((rec2 = iterator.next(rec2)) != null) {
- final Key k1 = rec1.getField(0, TestData.Key.class);
- final Key k2 = rec2.getField(0, TestData.Key.class);
pairsEmitted++;
- Assert.assertTrue(keyComparator.compare(k1, k2) <= 0);
-
- Record tmp = rec1;
+ Assert.assertTrue(keyComparator.compare(rec1.f0, rec2.f0) <= 0);
+
+ Tuple2<Integer, String> tmp = rec1;
rec1 = rec2;
- k1.setKey(k2.getKey());
-
rec2 = tmp;
}
Assert.assertTrue(NUM_PAIRS == pairsEmitted);
@@ -162,15 +153,15 @@ public class ExternalSortITCase {
public void testInMemorySortUsing10Buffers() {
try {
// comparator
- final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+ final TypeComparator<Integer> keyComparator = new IntComparator(true);
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
- final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS);
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
+ final MutableObjectIterator<Tuple2<Integer, String>> source = new TestData.TupleGeneratorIterator(generator, NUM_PAIRS);
// merge iterator
LOG.debug("Initializing sortmerger...");
- Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager,
+ Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
(double)64/78, 10, 2, 0.9f, false);
@@ -178,26 +169,22 @@ public class ExternalSortITCase {
LOG.debug("Reading and sorting data...");
// check order
- MutableObjectIterator<Record> iterator = merger.getIterator();
+ MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
LOG.debug("Checking results...");
int pairsEmitted = 1;
- Record rec1 = new Record();
- Record rec2 = new Record();
+ Tuple2<Integer, String> rec1 = new Tuple2<>();
+ Tuple2<Integer, String> rec2 = new Tuple2<>();
Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
while ((rec2 = iterator.next(rec2)) != null) {
- final Key k1 = rec1.getField(0, TestData.Key.class);
- final Key k2 = rec2.getField(0, TestData.Key.class);
pairsEmitted++;
- Assert.assertTrue(keyComparator.compare(k1, k2) <= 0);
-
- Record tmp = rec1;
+ Assert.assertTrue(keyComparator.compare(rec1.f0, rec2.f0) <= 0);
+
+ Tuple2<Integer, String> tmp = rec1;
rec1 = rec2;
- k1.setKey(k2.getKey());
-
rec2 = tmp;
}
Assert.assertTrue(NUM_PAIRS == pairsEmitted);
@@ -215,15 +202,15 @@ public class ExternalSortITCase {
public void testSpillingSort() {
try {
// comparator
- final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+ final TypeComparator<Integer> keyComparator = new IntComparator(true);
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
- final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, NUM_PAIRS);
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.CONSTANT, VAL);
+ final MutableObjectIterator<Tuple2<Integer, String>> source = new TestData.TupleGeneratorIterator(generator, NUM_PAIRS);
// merge iterator
LOG.debug("Initializing sortmerger...");
- Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager,
+ Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
(double)16/78, 64, 0.7f, true);
@@ -231,26 +218,22 @@ public class ExternalSortITCase {
LOG.debug("Reading and sorting data...");
// check order
- MutableObjectIterator<Record> iterator = merger.getIterator();
+ MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
LOG.debug("Checking results...");
int pairsEmitted = 1;
- Record rec1 = new Record();
- Record rec2 = new Record();
+ Tuple2<Integer, String> rec1 = new Tuple2<>();
+ Tuple2<Integer, String> rec2 = new Tuple2<>();
Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
while ((rec2 = iterator.next(rec2)) != null) {
- final Key k1 = rec1.getField(0, TestData.Key.class);
- final Key k2 = rec2.getField(0, TestData.Key.class);
pairsEmitted++;
- Assert.assertTrue(keyComparator.compare(k1, k2) <= 0);
-
- Record tmp = rec1;
+ Assert.assertTrue(keyComparator.compare(rec1.f0, rec2.f0) <= 0);
+
+ Tuple2<Integer, String> tmp = rec1;
rec1 = rec2;
- k1.setKey(k2.getKey());
-
rec2 = tmp;
}
Assert.assertTrue(NUM_PAIRS == pairsEmitted);
@@ -271,15 +254,15 @@ public class ExternalSortITCase {
final int PAIRS = 10000000;
// comparator
- final Comparator<TestData.Key> keyComparator = new TestData.KeyComparator();
+ final TypeComparator<Integer> keyComparator = new IntComparator(true);
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
- final MutableObjectIterator<Record> source = new TestData.GeneratorIterator(generator, PAIRS);
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+ final MutableObjectIterator<Tuple2<Integer, String>> source = new TestData.TupleGeneratorIterator(generator, PAIRS);
// merge iterator
LOG.debug("Initializing sortmerger...");
- Sorter<Record> merger = new UnilateralSortMerger<Record>(this.memoryManager, this.ioManager,
+ Sorter<Tuple2<Integer, String>> merger = new UnilateralSortMerger<>(this.memoryManager, this.ioManager,
source, this.parentTask, this.pactRecordSerializer, this.pactRecordComparator,
(double)64/78, 16, 0.7f, false);
@@ -287,26 +270,23 @@ public class ExternalSortITCase {
LOG.debug("Emitting data...");
// check order
- MutableObjectIterator<Record> iterator = merger.getIterator();
+ MutableObjectIterator<Tuple2<Integer, String>> iterator = merger.getIterator();
LOG.debug("Checking results...");
int pairsRead = 1;
int nextStep = PAIRS / 20;
- Record rec1 = new Record();
- Record rec2 = new Record();
+ Tuple2<Integer, String> rec1 = new Tuple2<>();
+ Tuple2<Integer, String> rec2 = new Tuple2<>();
Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
while ((rec2 = iterator.next(rec2)) != null) {
- final Key k1 = rec1.getField(0, TestData.Key.class);
- final Key k2 = rec2.getField(0, TestData.Key.class);
pairsRead++;
- Assert.assertTrue(keyComparator.compare(k1, k2) <= 0);
-
- Record tmp = rec1;
+ Assert.assertTrue(keyComparator.compare(rec1.f0, rec2.f0) <= 0);
+
+ Tuple2<Integer, String> tmp = rec1;
rec1 = rec2;
- k1.setKey(k2.getKey());
rec2 = tmp;
// log
@@ -335,7 +315,7 @@ public class ExternalSortITCase {
final RandomIntPairGenerator generator = new RandomIntPairGenerator(12345678, PAIRS);
final TypeSerializerFactory<IntPair> serializerFactory = new IntPairSerializer.IntPairSerializerFactory();
- final TypeComparator<IntPair> comparator = new IntPairComparator();
+ final TypeComparator<IntPair> comparator = new TestData.IntPairComparator();
// merge iterator
LOG.debug("Initializing sortmerger...");
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
index 07330ee..3329335 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MergeIteratorTest.java
@@ -19,16 +19,12 @@
package org.apache.flink.runtime.operators.sort;
import java.util.ArrayList;
-import java.util.Comparator;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.runtime.operators.sort.MergeIterator;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.types.Record;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Assert;
import org.junit.Before;
@@ -36,33 +32,33 @@ import org.junit.Test;
public class MergeIteratorTest {
- private TypeComparator<Record> comparator;
+ private TypeComparator<Tuple2<Integer, String>> comparator;
@SuppressWarnings("unchecked")
@Before
public void setup() {
- this.comparator = new RecordComparator(new int[] {0}, new Class[] { TestData.Key.class});
+ this.comparator = TestData.getIntStringTupleComparator();
}
- private MutableObjectIterator<Record> newIterator(final int[] keys, final String[] values) {
+ private MutableObjectIterator<Tuple2<Integer, String>> newIterator(final int[] keys, final String[] values) {
- return new MutableObjectIterator<Record>() {
+ return new MutableObjectIterator<Tuple2<Integer, String>>() {
- private Key key = new Key();
- private Value value = new Value();
+ private int key = 0;
+ private String value = new String();
private int current = 0;
@Override
- public Record next(Record reuse) {
+ public Tuple2<Integer, String> next(Tuple2<Integer, String> reuse) {
if (current < keys.length) {
- key.setKey(keys[current]);
- value.setValue(values[current]);
+ key = keys[current];
+ value = values[current];
current++;
- reuse.setField(0, key);
- reuse.setField(1, value);
+ reuse.setField(key, 0);
+ reuse.setField(value, 1);
return reuse;
}
else {
@@ -71,9 +67,9 @@ public class MergeIteratorTest {
}
@Override
- public Record next() {
+ public Tuple2<Integer, String> next() {
if (current < keys.length) {
- Record result = new Record(new Key(keys[current]), new Value(values[current]));
+ Tuple2<Integer, String> result = new Tuple2<>(keys[current], values[current]);
current++;
return result;
}
@@ -88,37 +84,37 @@ public class MergeIteratorTest {
public void testMergeOfTwoStreams() throws Exception
{
// iterators
- List<MutableObjectIterator<Record>> iterators = new ArrayList<MutableObjectIterator<Record>>();
+ List<MutableObjectIterator<Tuple2<Integer, String>>> iterators = new ArrayList<>();
iterators.add(newIterator(new int[] { 1, 2, 4, 5, 10 }, new String[] { "1", "2", "4", "5", "10" }));
iterators.add(newIterator(new int[] { 3, 6, 7, 10, 12 }, new String[] { "3", "6", "7", "10", "12" }));
final int[] expected = new int[] {1, 2, 3, 4, 5, 6, 7, 10, 10, 12};
// comparator
- Comparator<TestData.Key> comparator = new TestData.KeyComparator();
+ TypeComparator<Integer> comparator = new IntComparator(true);
// merge iterator
- MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.comparator);
+ MutableObjectIterator<Tuple2<Integer, String>> iterator = new MergeIterator<>(iterators, this.comparator);
// check expected order
- Record rec1 = new Record();
- Record rec2 = new Record();
- final Key k1 = new Key();
- final Key k2 = new Key();
+ Tuple2<Integer, String> rec1 = new Tuple2<>();
+ Tuple2<Integer, String> rec2 = new Tuple2<>();
+ int k1 = 0;
+ int k2 = 0;
int pos = 1;
Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
- Assert.assertEquals(expected[0], rec1.getField(0, TestData.Key.class).getKey());
+ Assert.assertEquals(expected[0], rec1.f0.intValue());
while ((rec2 = iterator.next(rec2)) != null) {
- k1.setKey(rec1.getField(0, TestData.Key.class).getKey());
- k2.setKey(rec2.getField(0, TestData.Key.class).getKey());
+ k1 = rec1.f0;
+ k2 = rec2.f0;
Assert.assertTrue(comparator.compare(k1, k2) <= 0);
- Assert.assertEquals(expected[pos++], k2.getKey());
+ Assert.assertEquals(expected[pos++], k2);
- Record tmp = rec1;
+ Tuple2<Integer, String> tmp = rec1;
rec1 = rec2;
rec2 = tmp;
}
@@ -128,7 +124,7 @@ public class MergeIteratorTest {
public void testMergeOfTenStreams() throws Exception
{
// iterators
- List<MutableObjectIterator<Record>> iterators = new ArrayList<MutableObjectIterator<Record>>();
+ List<MutableObjectIterator<Tuple2<Integer, String>>> iterators = new ArrayList<>();
iterators.add(newIterator(new int[] { 1, 2, 17, 23, 23 }, new String[] { "A", "B", "C", "D", "E" }));
iterators.add(newIterator(new int[] { 2, 6, 7, 8, 9 }, new String[] { "A", "B", "C", "D", "E" }));
iterators.add(newIterator(new int[] { 4, 10, 11, 11, 12 }, new String[] { "A", "B", "C", "D", "E" }));
@@ -141,26 +137,23 @@ public class MergeIteratorTest {
iterators.add(newIterator(new int[] { 8, 8, 14, 14, 15 }, new String[] { "A", "B", "C", "D", "E" }));
// comparator
- Comparator<TestData.Key> comparator = new TestData.KeyComparator();
+ TypeComparator<Integer> comparator = new IntComparator(true);
// merge iterator
- MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.comparator);
+ MutableObjectIterator<Tuple2<Integer, String>> iterator = new MergeIterator<>(iterators, this.comparator);
int elementsFound = 1;
// check expected order
- Record rec1 = new Record();
- Record rec2 = new Record();
- final Key k1 = new Key();
- final Key k2 = new Key();
+ Tuple2<Integer, String> rec1 = new Tuple2<>();
+ Tuple2<Integer, String> rec2 = new Tuple2<>();
Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
while ((rec2 = iterator.next(rec2)) != null) {
elementsFound++;
- k1.setKey(rec1.getField(0, TestData.Key.class).getKey());
- k2.setKey(rec2.getField(0, TestData.Key.class).getKey());
- Assert.assertTrue(comparator.compare(k1, k2) <= 0);
+
+ Assert.assertTrue(comparator.compare(rec1.f0, rec2.f0) <= 0);
- Record tmp = rec1;
+ Tuple2<Integer, String> tmp = rec1;
rec1 = rec2;
rec2 = tmp;
}
@@ -172,7 +165,7 @@ public class MergeIteratorTest {
public void testInvalidMerge() throws Exception
{
// iterators
- List<MutableObjectIterator<Record>> iterators = new ArrayList<MutableObjectIterator<Record>>();
+ List<MutableObjectIterator<Tuple2<Integer, String>>> iterators = new ArrayList<>();
iterators.add(newIterator(new int[] { 1, 2, 17, 23, 23 }, new String[] { "A", "B", "C", "D", "E" }));
iterators.add(newIterator(new int[] { 2, 6, 7, 8, 9 }, new String[] { "A", "B", "C", "D", "E" }));
iterators.add(newIterator(new int[] { 4, 10, 11, 11, 12 }, new String[] { "A", "B", "C", "D", "E" }));
@@ -185,31 +178,26 @@ public class MergeIteratorTest {
iterators.add(newIterator(new int[] { 8, 8, 14, 14, 15 }, new String[] { "A", "B", "C", "D", "E" }));
// comparator
- Comparator<TestData.Key> comparator = new TestData.KeyComparator();
+ TypeComparator<Integer> comparator = new IntComparator(true);
// merge iterator
- MutableObjectIterator<Record> iterator = new MergeIterator<Record>(iterators, this.comparator);
+ MutableObjectIterator<Tuple2<Integer, String>> iterator = new MergeIterator<>(iterators, this.comparator);
boolean violationFound = false;
// check expected order
- Record rec1 = new Record();
- Record rec2 = new Record();
+ Tuple2<Integer, String> rec1 = new Tuple2<>();
+ Tuple2<Integer, String> rec2 = new Tuple2<>();
Assert.assertTrue((rec1 = iterator.next(rec1)) != null);
while ((rec2 = iterator.next(rec2)) != null)
- {
- final Key k1 = new Key();
- final Key k2 = new Key();
- k1.setKey(rec1.getField(0, TestData.Key.class).getKey());
- k2.setKey(rec2.getField(0, TestData.Key.class).getKey());
-
- if (comparator.compare(k1, k2) > 0) {
+ {
+ if (comparator.compare(rec1.f0, rec2.f0) > 0) {
violationFound = true;
break;
}
- Record tmp = rec1;
+ Tuple2<Integer, String> tmp = rec1;
rec1 = rec2;
rec2 = tmp;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
index 1a6884e..dc517b8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NonReusingSortMergeCoGroupIteratorITCase.java
@@ -22,14 +22,10 @@ package org.apache.flink.runtime.operators.sort;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Assert;
import org.junit.Before;
@@ -43,6 +39,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
/**
*/
@@ -59,77 +57,77 @@ public class NonReusingSortMergeCoGroupIteratorITCase
private static final long SEED2 = 231434613412342L;
// left and right input data generators
- private Generator generator1;
+ private TupleGenerator generator1;
- private Generator generator2;
+ private TupleGenerator generator2;
- // left and right input RecordReader mocks
- private MutableObjectIterator<Record> reader1;
+ // left and right input TupleReader mocks
+ private MutableObjectIterator<Tuple2<Integer, String>> reader1;
- private MutableObjectIterator<Record> reader2;
+ private MutableObjectIterator<Tuple2<Integer, String>> reader2;
- private TypeSerializer<Record> serializer1;
- private TypeSerializer<Record> serializer2;
- private TypeComparator<Record> comparator1;
- private TypeComparator<Record> comparator2;
- private TypePairComparator<Record, Record> pairComparator;
+ private TypeSerializer<Tuple2<Integer, String>> serializer1;
+ private TypeSerializer<Tuple2<Integer, String>> serializer2;
+ private TypeComparator<Tuple2<Integer, String>> comparator1;
+ private TypeComparator<Tuple2<Integer, String>> comparator2;
+ private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator;
@SuppressWarnings("unchecked")
@Before
public void beforeTest() {
- this.serializer1 = RecordSerializer.get();
- this.serializer2 = RecordSerializer.get();
- this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
- this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
- this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
+ this.serializer1 = TestData.getIntStringTupleSerializer();
+ this.serializer2 = TestData.getIntStringTupleSerializer();
+ this.comparator1 = TestData.getIntStringTupleComparator();
+ this.comparator2 = TestData.getIntStringTupleComparator();
+ this.pairComparator = new GenericPairComparator(this.comparator1, this.comparator2);
}
@Test
public void testMerge() {
try {
- generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
- generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+ generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+ generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
- reader1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
- reader2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ reader1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ reader2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
- Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap1 = collectData(generator1, INPUT_1_SIZE);
- Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap2 = collectData(generator2, INPUT_2_SIZE);
- Map<TestData.Key, List<Collection<TestData.Value>>> expectedCoGroupsMap = coGroupValues(expectedValuesMap1, expectedValuesMap2);
+ Map<Integer, Collection<String>> expectedValuesMap1 = collectData(generator1, INPUT_1_SIZE);
+ Map<Integer, Collection<String>> expectedValuesMap2 = collectData(generator2, INPUT_2_SIZE);
+ Map<Integer, List<Collection<String>>> expectedCoGroupsMap = coGroupValues(expectedValuesMap1, expectedValuesMap2);
// reset the generators
generator1.reset();
generator2.reset();
// compare with iterator values
- NonReusingSortMergeCoGroupIterator<Record, Record> iterator = new NonReusingSortMergeCoGroupIterator<Record, Record>(
+ NonReusingSortMergeCoGroupIterator<Tuple2<Integer, String>,Tuple2<Integer, String>> iterator = new NonReusingSortMergeCoGroupIterator<>(
this.reader1, this.reader2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
this.pairComparator);
iterator.open();
- final TestData.Key key = new TestData.Key();
+ int key = 0;
while (iterator.next())
{
- Iterator<Record> iter1 = iterator.getValues1().iterator();
- Iterator<Record> iter2 = iterator.getValues2().iterator();
+ Iterator<Tuple2<Integer, String>> iter1 = iterator.getValues1().iterator();
+ Iterator<Tuple2<Integer, String>> iter2 = iterator.getValues2().iterator();
- TestData.Value v1 = null;
- TestData.Value v2 = null;
+ String v1 = null;
+ String v2 = null;
if (iter1.hasNext()) {
- Record rec = iter1.next();
- rec.getFieldInto(0, key);
- v1 = rec.getField(1, TestData.Value.class);
+ Tuple2<Integer, String> rec = iter1.next();
+ key = rec.f0;
+ v1 = rec.f1;
}
else if (iter2.hasNext()) {
- Record rec = iter2.next();
- rec.getFieldInto(0, key);
- v2 = rec.getField(1, TestData.Value.class);
+ Tuple2<Integer, String> rec = iter2.next();
+ key = rec.f0;
+ v2 = rec.f1;
}
else {
Assert.fail("No input on both sides.");
@@ -138,8 +136,8 @@ public class NonReusingSortMergeCoGroupIteratorITCase
// assert that matches for this key exist
Assert.assertTrue("No matches for key " + key, expectedCoGroupsMap.containsKey(key));
- Collection<TestData.Value> expValues1 = expectedCoGroupsMap.get(key).get(0);
- Collection<TestData.Value> expValues2 = expectedCoGroupsMap.get(key).get(1);
+ Collection<String> expValues1 = expectedCoGroupsMap.get(key).get(0);
+ Collection<String> expValues2 = expectedCoGroupsMap.get(key).get(1);
if (v1 != null) {
expValues1.remove(v1);
@@ -149,14 +147,14 @@ public class NonReusingSortMergeCoGroupIteratorITCase
}
while(iter1.hasNext()) {
- Record rec = iter1.next();
- Assert.assertTrue("Value not in expected set of first input", expValues1.remove(rec.getField(1, TestData.Value.class)));
+ Tuple2<Integer, String> rec = iter1.next();
+ Assert.assertTrue("Value not in expected set of first input", expValues1.remove(rec.f1));
}
Assert.assertTrue("Expected set of first input not empty", expValues1.isEmpty());
while(iter2.hasNext()) {
- Record rec = iter2.next();
- Assert.assertTrue("Value not in expected set of second input", expValues2.remove(rec.getField(1, TestData.Value.class)));
+ Tuple2<Integer, String> rec = iter2.next();
+ Assert.assertTrue("Value not in expected set of second input", expValues2.remove(rec.f1));
}
Assert.assertTrue("Expected set of second input not empty", expValues2.isEmpty());
@@ -174,28 +172,28 @@ public class NonReusingSortMergeCoGroupIteratorITCase
// --------------------------------------------------------------------------------------------
- private Map<TestData.Key, List<Collection<TestData.Value>>> coGroupValues(
- Map<TestData.Key, Collection<TestData.Value>> leftMap,
- Map<TestData.Key, Collection<TestData.Value>> rightMap)
+ private Map<Integer, List<Collection<String>>> coGroupValues(
+ Map<Integer, Collection<String>> leftMap,
+ Map<Integer, Collection<String>> rightMap)
{
- Map<TestData.Key, List<Collection<TestData.Value>>> map = new HashMap<TestData.Key, List<Collection<TestData.Value>>>(1000);
+ Map<Integer, List<Collection<String>>> map = new HashMap<>(1000);
- Set<TestData.Key> keySet = new HashSet<TestData.Key>(leftMap.keySet());
+ Set<Integer> keySet = new HashSet<>(leftMap.keySet());
keySet.addAll(rightMap.keySet());
- for (TestData.Key key : keySet) {
- Collection<TestData.Value> leftValues = leftMap.get(key);
- Collection<TestData.Value> rightValues = rightMap.get(key);
- ArrayList<Collection<TestData.Value>> list = new ArrayList<Collection<TestData.Value>>(2);
+ for (Integer key : keySet) {
+ Collection<String> leftValues = leftMap.get(key);
+ Collection<String> rightValues = rightMap.get(key);
+ ArrayList<Collection<String>> list = new ArrayList<>(2);
if (leftValues == null) {
- list.add(new ArrayList<TestData.Value>(0));
+ list.add(new ArrayList<String>(0));
} else {
list.add(leftValues);
}
if (rightValues == null) {
- list.add(new ArrayList<TestData.Value>(0));
+ list.add(new ArrayList<String>(0));
} else {
list.add(rightValues);
}
@@ -205,22 +203,22 @@ public class NonReusingSortMergeCoGroupIteratorITCase
return map;
}
- private Map<TestData.Key, Collection<TestData.Value>> collectData(Generator iter, int num)
+ private Map<Integer, Collection<String>> collectData(TupleGenerator iter, int num)
throws Exception
{
- Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
- Record pair = new Record();
+ Map<Integer, Collection<String>> map = new HashMap<>();
+ Tuple2<Integer, String> pair = new Tuple2<>();
for (int i = 0; i < num; i++) {
iter.next(pair);
- TestData.Key key = pair.getField(0, TestData.Key.class);
+ Integer key = pair.f0;
if (!map.containsKey(key)) {
- map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+ map.put(key, new ArrayList<String>());
}
- Collection<TestData.Value> values = map.get(key);
- values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+ Collection<String> values = map.get(key);
+ values.add(pair.f1);
}
return map;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
index f8a8f11..45f20ca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/NormalizedKeySorterTest.java
@@ -21,19 +21,16 @@ package org.apache.flink.runtime.operators.sort;
import java.util.List;
import java.util.Random;
+import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
@@ -76,10 +73,9 @@ public class NormalizedKeySorterTest {
}
}
- private NormalizedKeySorter<Record> newSortBuffer(List<MemorySegment> memory) throws Exception {
- @SuppressWarnings({"unchecked", "rawtypes"})
- RecordComparator accessors = new RecordComparator(new int[] {0}, new Class[]{ Key.class });
- return new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
+ private NormalizedKeySorter<Tuple2<Integer, String>> newSortBuffer(List<MemorySegment> memory) throws Exception
+ {
+ return new NormalizedKeySorter<>(TestData.getIntStringTupleSerializer(), TestData.getIntStringTupleComparator(), memory);
}
@Test
@@ -87,12 +83,12 @@ public class NormalizedKeySorterTest {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
- NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
- TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+ NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+ TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
ValueMode.RANDOM_LENGTH);
// write the records
- Record record = new Record();
+ Tuple2<Integer, String> record = new Tuple2<>();
int num = -1;
do {
generator.next(record);
@@ -102,18 +98,18 @@ public class NormalizedKeySorterTest {
// re-read the records
generator.reset();
- Record readTarget = new Record();
+ Tuple2<Integer, String> readTarget = new Tuple2<>();
int i = 0;
while (i < num) {
generator.next(record);
readTarget = sorter.getRecord(readTarget, i++);
- Key rk = readTarget.getField(0, Key.class);
- Key gk = record.getField(0, Key.class);
+ int rk = readTarget.f0;
+ int gk = record.f0;
- Value rv = readTarget.getField(1, Value.class);
- Value gv = record.getField(1, Value.class);
+ String rv = readTarget.f1;
+ String gv = record.f1;
Assert.assertEquals("The re-read key is wrong", gk, rk);
Assert.assertEquals("The re-read value is wrong", gv, rv);
@@ -129,12 +125,12 @@ public class NormalizedKeySorterTest {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
- NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
- TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+ NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+ TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
ValueMode.RANDOM_LENGTH);
// write the records
- Record record = new Record();
+ Tuple2<Integer, String> record = new Tuple2<>();
do {
generator.next(record);
}
@@ -142,17 +138,17 @@ public class NormalizedKeySorterTest {
// re-read the records
generator.reset();
- MutableObjectIterator<Record> iter = sorter.getIterator();
- Record readTarget = new Record();
+ MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
+ Tuple2<Integer, String> readTarget = new Tuple2<>();
while ((readTarget = iter.next(readTarget)) != null) {
generator.next(record);
- Key rk = readTarget.getField(0, Key.class);
- Key gk = record.getField(0, Key.class);
+ int rk = readTarget.f0;
+ int gk = record.f0;
- Value rv = readTarget.getField(1, Value.class);
- Value gv = record.getField(1, Value.class);
+ String rv = readTarget.f1;
+ String gv = record.f1;
Assert.assertEquals("The re-read key is wrong", gk, rk);
Assert.assertEquals("The re-read value is wrong", gv, rv);
@@ -168,11 +164,11 @@ public class NormalizedKeySorterTest {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
- NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
- TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+ NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+ TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
// write the buffer full with the first set of records
- Record record = new Record();
+ Tuple2<Integer, String> record = new Tuple2<>();
int num = -1;
do {
generator.next(record);
@@ -183,7 +179,7 @@ public class NormalizedKeySorterTest {
sorter.reset();
// write a second sequence of records. since the values are of fixed length, we must be able to write an equal number
- generator = new TestData.Generator(SEED2, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
+ generator = new TestData.TupleGenerator(SEED2, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.FIX_LENGTH);
// write the buffer full with the first set of records
int num2 = -1;
@@ -197,18 +193,18 @@ public class NormalizedKeySorterTest {
// re-read the records
generator.reset();
- Record readTarget = new Record();
+ Tuple2<Integer, String> readTarget = new Tuple2<>();
int i = 0;
while (i < num) {
generator.next(record);
readTarget = sorter.getRecord(readTarget, i++);
- Key rk = readTarget.getField(0, Key.class);
- Key gk = record.getField(0, Key.class);
+ int rk = readTarget.f0;
+ int gk = record.f0;
- Value rv = readTarget.getField(1, Value.class);
- Value gv = record.getField(1, Value.class);
+ String rv = readTarget.f1;
+ String gv = record.f1;
Assert.assertEquals("The re-read key is wrong", gk, rk);
Assert.assertEquals("The re-read value is wrong", gv, rv);
@@ -229,12 +225,12 @@ public class NormalizedKeySorterTest {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
- NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
- TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+ NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+ TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
ValueMode.RANDOM_LENGTH);
// write the records
- Record record = new Record();
+ Tuple2<Integer, String> record = new Tuple2<>();
int num = -1;
do {
generator.next(record);
@@ -250,18 +246,18 @@ public class NormalizedKeySorterTest {
// re-read the records
generator.reset();
- Record readTarget = new Record();
+ Tuple2<Integer, String> readTarget = new Tuple2<>();
int i = num - 1;
while (i >= 0) {
generator.next(record);
readTarget = sorter.getRecord(readTarget, i--);
- Key rk = readTarget.getField(0, Key.class);
- Key gk = record.getField(0, Key.class);
+ int rk = readTarget.f0;
+ int gk = record.f0;
- Value rv = readTarget.getField(1, Value.class);
- Value gv = record.getField(1, Value.class);
+ String rv = readTarget.f1;
+ String gv = record.f1;
Assert.assertEquals("The re-read key is wrong", gk, rk);
Assert.assertEquals("The re-read value is wrong", gv, rv);
@@ -282,12 +278,12 @@ public class NormalizedKeySorterTest {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
- NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
- TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.SORTED,
+ NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+ TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.SORTED,
ValueMode.RANDOM_LENGTH);
// write the records
- Record record = new Record();
+ Tuple2<Integer, String> record = new Tuple2<>();
int num = -1;
do {
generator.next(record);
@@ -323,12 +319,12 @@ public class NormalizedKeySorterTest {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
- NormalizedKeySorter<Record> sorter = newSortBuffer(memory);
- TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+ NormalizedKeySorter<Tuple2<Integer, String>> sorter = newSortBuffer(memory);
+ TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
ValueMode.RANDOM_LENGTH);
// write the records
- Record record = new Record();
+ Tuple2<Integer, String> record = new Tuple2<>();
int num = 0;
do {
generator.next(record);
@@ -339,26 +335,21 @@ public class NormalizedKeySorterTest {
QuickSort qs = new QuickSort();
qs.sort(sorter);
- MutableObjectIterator<Record> iter = sorter.getIterator();
- Record readTarget = new Record();
-
- Key current = new Key();
- Key last = new Key();
-
+ MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
+ Tuple2<Integer, String> readTarget = new Tuple2<>();
+
iter.next(readTarget);
- readTarget.getFieldInto(0, last);
+ int last = readTarget.f0;
while ((readTarget = iter.next(readTarget)) != null) {
- readTarget.getFieldInto(0, current);
+ int current = readTarget.f0;
- final int cmp = last.compareTo(current);
+ final int cmp = last - current;
if (cmp > 0) {
Assert.fail("Next key is not larger or equal to previous key.");
}
- Key tmp = current;
- current = last;
- last = tmp;
+ last = current;
}
// release the memory occupied by the buffers
@@ -370,16 +361,16 @@ public class NormalizedKeySorterTest {
public void testSortShortStringKeys() throws Exception {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
-
- @SuppressWarnings({"unchecked", "rawtypes"})
- RecordComparator accessors = new RecordComparator(new int[] {1}, new Class[]{Value.class});
- NormalizedKeySorter<Record> sorter = new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
- TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, 5, KeyMode.RANDOM,
+ @SuppressWarnings("unchecked")
+ TypeComparator<Tuple2<Integer, String>> accessors = TestData.getIntStringTupleTypeInfo().createComparator(new int[]{1}, new boolean[]{true}, 0, null);
+ NormalizedKeySorter<Tuple2<Integer, String>> sorter = new NormalizedKeySorter<>(TestData.getIntStringTupleSerializer(), accessors, memory);
+
+ TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, 5, KeyMode.RANDOM,
ValueMode.FIX_LENGTH);
// write the records
- Record record = new Record();
+ Tuple2<Integer, String> record = new Tuple2<>();
do {
generator.next(record);
}
@@ -388,26 +379,21 @@ public class NormalizedKeySorterTest {
QuickSort qs = new QuickSort();
qs.sort(sorter);
- MutableObjectIterator<Record> iter = sorter.getIterator();
- Record readTarget = new Record();
-
- Value current = new Value();
- Value last = new Value();
-
+ MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
+ Tuple2<Integer, String> readTarget = new Tuple2<>();
+
iter.next(readTarget);
- readTarget.getFieldInto(1, last);
+ String last = readTarget.f1;
while ((readTarget = iter.next(readTarget)) != null) {
- readTarget.getFieldInto(1, current);
+ String current = readTarget.f1;
final int cmp = last.compareTo(current);
if (cmp > 0) {
Assert.fail("Next value is not larger or equal to previous value.");
}
- Value tmp = current;
- current = last;
- last = tmp;
+ last = current;
}
// release the memory occupied by the buffers
@@ -420,15 +406,15 @@ public class NormalizedKeySorterTest {
final int numSegments = MEMORY_SIZE / MEMORY_PAGE_SIZE;
final List<MemorySegment> memory = this.memoryManager.allocatePages(new DummyInvokable(), numSegments);
- @SuppressWarnings({"unchecked", "rawtypes"})
- RecordComparator accessors = new RecordComparator(new int[] {1}, new Class[]{Value.class});
- NormalizedKeySorter<Record> sorter = new NormalizedKeySorter<Record>(RecordSerializer.get(), accessors, memory);
+ @SuppressWarnings("unchecked")
+ TypeComparator<Tuple2<Integer, String>> accessors = TestData.getIntStringTupleTypeInfo().createComparator(new int[]{1}, new boolean[]{true}, 0, null);
+ NormalizedKeySorter<Tuple2<Integer, String>> sorter = new NormalizedKeySorter<>(TestData.getIntStringTupleSerializer(), accessors, memory);
- TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
+ TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM,
ValueMode.FIX_LENGTH);
// write the records
- Record record = new Record();
+ Tuple2<Integer, String> record = new Tuple2<>();
do {
generator.next(record);
}
@@ -437,26 +423,21 @@ public class NormalizedKeySorterTest {
QuickSort qs = new QuickSort();
qs.sort(sorter);
- MutableObjectIterator<Record> iter = sorter.getIterator();
- Record readTarget = new Record();
-
- Value current = new Value();
- Value last = new Value();
+ MutableObjectIterator<Tuple2<Integer, String>> iter = sorter.getIterator();
+ Tuple2<Integer, String> readTarget = new Tuple2<>();
iter.next(readTarget);
- readTarget.getFieldInto(1, last);
+ String last = readTarget.f1;
while ((readTarget = iter.next(readTarget)) != null) {
- readTarget.getFieldInto(1, current);
+ String current = readTarget.f1;
final int cmp = last.compareTo(current);
if (cmp > 0) {
Assert.fail("Next value is not larger or equal to previous value.");
}
- Value tmp = current;
- current = last;
- last = tmp;
+ last = current;
}
// release the memory occupied by the buffers
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
index a487a65..d1de5dc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ReusingSortMergeCoGroupIteratorITCase.java
@@ -27,18 +27,16 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Assert;
import org.junit.Before;
@@ -59,77 +57,76 @@ public class ReusingSortMergeCoGroupIteratorITCase
private static final long SEED2 = 231434613412342L;
// left and right input data generators
- private Generator generator1;
+ private TupleGenerator generator1;
- private Generator generator2;
+ private TupleGenerator generator2;
- // left and right input RecordReader mocks
- private MutableObjectIterator<Record> reader1;
+ // left and right input Tuple2<Integer, String>Reader mocks
+ private MutableObjectIterator<Tuple2<Integer, String>> reader1;
- private MutableObjectIterator<Record> reader2;
+ private MutableObjectIterator<Tuple2<Integer, String>> reader2;
-
- private TypeSerializer<Record> serializer1;
- private TypeSerializer<Record> serializer2;
- private TypeComparator<Record> comparator1;
- private TypeComparator<Record> comparator2;
- private TypePairComparator<Record, Record> pairComparator;
+ private TypeSerializer<Tuple2<Integer, String>> serializer1;
+ private TypeSerializer<Tuple2<Integer, String>> serializer2;
+ private TypeComparator<Tuple2<Integer, String>> comparator1;
+ private TypeComparator<Tuple2<Integer, String>> comparator2;
+ private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator;
@SuppressWarnings("unchecked")
@Before
public void beforeTest() {
- this.serializer1 = RecordSerializer.get();
- this.serializer2 = RecordSerializer.get();
- this.comparator1 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
- this.comparator2 = new RecordComparator(new int[] {0}, new Class[]{TestData.Key.class});
- this.pairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[]{TestData.Key.class});
+ this.serializer1 = TestData.getIntStringTupleSerializer();
+ this.serializer2 = TestData.getIntStringTupleSerializer();
+ this.comparator1 = TestData.getIntStringTupleComparator();
+ this.comparator2 = TestData.getIntStringTupleComparator();
+ this.pairComparator = new GenericPairComparator(comparator1, comparator2);
}
@Test
public void testMerge() {
try {
- generator1 = new Generator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
- generator2 = new Generator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+ generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
+ generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.SORTED, ValueMode.RANDOM_LENGTH);
- reader1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
- reader2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ reader1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ reader2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
- Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap1 = collectData(generator1, INPUT_1_SIZE);
- Map<TestData.Key, Collection<TestData.Value>> expectedValuesMap2 = collectData(generator2, INPUT_2_SIZE);
- Map<TestData.Key, List<Collection<TestData.Value>>> expectedCoGroupsMap = coGroupValues(expectedValuesMap1, expectedValuesMap2);
+ Map<Integer, Collection<String>> expectedStringsMap1 = collectData(generator1, INPUT_1_SIZE);
+ Map<Integer, Collection<String>> expectedStringsMap2 = collectData(generator2, INPUT_2_SIZE);
+ Map<Integer, List<Collection<String>>> expectedCoGroupsMap = coGroupValues(expectedStringsMap1, expectedStringsMap2);
// reset the generators
generator1.reset();
generator2.reset();
// compare with iterator values
- ReusingSortMergeCoGroupIterator<Record, Record> iterator = new ReusingSortMergeCoGroupIterator<Record, Record>(
+ ReusingSortMergeCoGroupIterator<Tuple2<Integer, String>, Tuple2<Integer, String>> iterator = new ReusingSortMergeCoGroupIterator<>(
this.reader1, this.reader2, this.serializer1, this.comparator1, this.serializer2, this.comparator2,
this.pairComparator);
iterator.open();
- final TestData.Key key = new TestData.Key();
+ int key = 0;
while (iterator.next())
{
- Iterator<Record> iter1 = iterator.getValues1().iterator();
- Iterator<Record> iter2 = iterator.getValues2().iterator();
+ Iterator<Tuple2<Integer, String>> iter1 = iterator.getValues1().iterator();
+ Iterator<Tuple2<Integer, String>> iter2 = iterator.getValues2().iterator();
- TestData.Value v1 = null;
- TestData.Value v2 = null;
+ String v1 = null;
+ String v2 = null;
if (iter1.hasNext()) {
- Record rec = iter1.next();
- rec.getFieldInto(0, key);
- v1 = rec.getField(1, TestData.Value.class);
+ Tuple2<Integer, String> rec = iter1.next();
+ key = rec.f0;
+ v1 = rec.f1;
}
else if (iter2.hasNext()) {
- Record rec = iter2.next();
- rec.getFieldInto(0, key);
- v2 = rec.getField(1, TestData.Value.class);
+ Tuple2<Integer, String> rec = iter2.next();
+ key = rec.f0;
+ v2 = rec.f1;
}
else {
Assert.fail("No input on both sides.");
@@ -138,8 +135,8 @@ public class ReusingSortMergeCoGroupIteratorITCase
// assert that matches for this key exist
Assert.assertTrue("No matches for key " + key, expectedCoGroupsMap.containsKey(key));
- Collection<TestData.Value> expValues1 = expectedCoGroupsMap.get(key).get(0);
- Collection<TestData.Value> expValues2 = expectedCoGroupsMap.get(key).get(1);
+ Collection<String> expValues1 = expectedCoGroupsMap.get(key).get(0);
+ Collection<String> expValues2 = expectedCoGroupsMap.get(key).get(1);
if (v1 != null) {
expValues1.remove(v1);
@@ -149,14 +146,14 @@ public class ReusingSortMergeCoGroupIteratorITCase
}
while(iter1.hasNext()) {
- Record rec = iter1.next();
- Assert.assertTrue("Value not in expected set of first input", expValues1.remove(rec.getField(1, TestData.Value.class)));
+ Tuple2<Integer, String> rec = iter1.next();
+ Assert.assertTrue("String not in expected set of first input", expValues1.remove(rec.f1));
}
Assert.assertTrue("Expected set of first input not empty", expValues1.isEmpty());
while(iter2.hasNext()) {
- Record rec = iter2.next();
- Assert.assertTrue("Value not in expected set of second input", expValues2.remove(rec.getField(1, TestData.Value.class)));
+ Tuple2<Integer, String> rec = iter2.next();
+ Assert.assertTrue("String not in expected set of second input", expValues2.remove(rec.f1));
}
Assert.assertTrue("Expected set of second input not empty", expValues2.isEmpty());
@@ -174,28 +171,28 @@ public class ReusingSortMergeCoGroupIteratorITCase
// --------------------------------------------------------------------------------------------
- private Map<TestData.Key, List<Collection<TestData.Value>>> coGroupValues(
- Map<TestData.Key, Collection<TestData.Value>> leftMap,
- Map<TestData.Key, Collection<TestData.Value>> rightMap)
+ private Map<Integer, List<Collection<String>>> coGroupValues(
+ Map<Integer, Collection<String>> leftMap,
+ Map<Integer, Collection<String>> rightMap)
{
- Map<TestData.Key, List<Collection<TestData.Value>>> map = new HashMap<TestData.Key, List<Collection<TestData.Value>>>(1000);
+ Map<Integer, List<Collection<String>>> map = new HashMap<>(1000);
- Set<TestData.Key> keySet = new HashSet<TestData.Key>(leftMap.keySet());
+ Set<Integer> keySet = new HashSet<>(leftMap.keySet());
keySet.addAll(rightMap.keySet());
- for (TestData.Key key : keySet) {
- Collection<TestData.Value> leftValues = leftMap.get(key);
- Collection<TestData.Value> rightValues = rightMap.get(key);
- ArrayList<Collection<TestData.Value>> list = new ArrayList<Collection<TestData.Value>>(2);
+ for (Integer key : keySet) {
+ Collection<String> leftValues = leftMap.get(key);
+ Collection<String> rightValues = rightMap.get(key);
+ ArrayList<Collection<String>> list = new ArrayList<>(2);
if (leftValues == null) {
- list.add(new ArrayList<TestData.Value>(0));
+ list.add(new ArrayList<String>(0));
} else {
list.add(leftValues);
}
if (rightValues == null) {
- list.add(new ArrayList<TestData.Value>(0));
+ list.add(new ArrayList<String>(0));
} else {
list.add(rightValues);
}
@@ -205,22 +202,22 @@ public class ReusingSortMergeCoGroupIteratorITCase
return map;
}
- private Map<TestData.Key, Collection<TestData.Value>> collectData(Generator iter, int num)
+ private Map<Integer, Collection<String>> collectData(TupleGenerator iter, int num)
throws Exception
{
- Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
- Record pair = new Record();
+ Map<Integer, Collection<String>> map = new HashMap<>();
+ Tuple2<Integer, String> pair = new Tuple2<>();
for (int i = 0; i < num; i++) {
iter.next(pair);
- TestData.Key key = pair.getField(0, TestData.Key.class);
+ int key = pair.f0;
if (!map.containsKey(key)) {
- map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+ map.put(key, new ArrayList<String>());
}
- Collection<TestData.Value> values = map.get(key);
- values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+ Collection<String> values = map.get(key);
+ values.add(pair.f1);
}
return map;
}