You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/15 15:07:24 UTC
[5/6] flink git commit: [FLINK-2479] Refactor runtime.operators.*
tests
[FLINK-2479] Refactor runtime.operators.* tests
This closes #1160
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fbc18b96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fbc18b96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fbc18b96
Branch: refs/heads/master
Commit: fbc18b96a86bc54da189f713ed01370524558249
Parents: d9e32da
Author: zentol <s....@web.de>
Authored: Sun Sep 20 19:10:26 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Oct 15 11:30:42 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/io/disk/ChannelViewsTest.java | 136 +++--
.../runtime/io/disk/SpillingBufferTest.java | 141 ++---
.../hash/NonReusingHashMatchIteratorITCase.java | 343 +++++------
.../NonReusingReOpenableHashTableITCase.java | 190 +++---
.../hash/ReusingHashMatchIteratorITCase.java | 335 +++++-----
.../hash/ReusingReOpenableHashTableITCase.java | 188 +++---
.../CombiningUnilateralSortMergerITCase.java | 144 +++--
.../operators/sort/ExternalSortITCase.java | 120 ++--
.../operators/sort/MergeIteratorTest.java | 98 ++-
...onReusingSortMergeCoGroupIteratorITCase.java | 128 ++--
.../operators/sort/NormalizedKeySorterTest.java | 171 +++---
.../ReusingSortMergeCoGroupIteratorITCase.java | 129 ++--
.../runtime/operators/testutils/TestData.java | 608 ++++++++++---------
.../operators/util/HashVsSortMiniBenchmark.java | 93 ++-
14 files changed, 1374 insertions(+), 1450 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
index a44916a..ba86131 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/ChannelViewsTest.java
@@ -36,11 +36,10 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -102,19 +101,19 @@ public class ChannelViewsTest
@Test
public void testWriteReadSmallRecords() throws Exception
{
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final FileIOChannel.ID channel = this.ioManager.createChannel();
+ final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
final BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(channel);
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
-
// write a number of pairs
- final Record rec = new Record();
+ final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
- rec.write(outView);
+ serializer.serialize(rec, outView);
}
this.memoryManager.release(outView.close());
@@ -125,18 +124,18 @@ public class ChannelViewsTest
generator.reset();
// read and re-generate all records and compare them
- final Record readRec = new Record();
+ final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
- readRec.read(inView);
+ serializer.deserialize(readRec, inView);
- Key k1 = rec.getField(0, Key.class);
- Value v1 = rec.getField(1, Value.class);
+ int k1 = rec.f0;
+ String v1 = rec.f1;
- Key k2 = readRec.getField(0, Key.class);
- Value v2 = readRec.getField(1, Value.class);
+ int k2 = readRec.f0;
+ String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
this.memoryManager.release(inView.close());
@@ -146,8 +145,9 @@ public class ChannelViewsTest
@Test
public void testWriteAndReadLongRecords() throws Exception
{
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LONG_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final FileIOChannel.ID channel = this.ioManager.createChannel();
+ final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -155,10 +155,10 @@ public class ChannelViewsTest
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
- final Record rec = new Record();
+ final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_LONG; i++) {
generator.next(rec);
- rec.write(outView);
+ serializer.serialize(rec, outView);
}
this.memoryManager.release(outView.close());
@@ -169,15 +169,15 @@ public class ChannelViewsTest
generator.reset();
// read and re-generate all records and compare them
- final Record readRec = new Record();
+ final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_LONG; i++) {
generator.next(rec);
- readRec.read(inView);
- final Key k1 = rec.getField(0, Key.class);
- final Value v1 = rec.getField(1, Value.class);
- final Key k2 = readRec.getField(0, Key.class);
- final Value v2 = readRec.getField(1, Value.class);
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ serializer.deserialize(readRec, inView);
+ final int k1 = rec.f0;
+ final String v1 = rec.f1;
+ final int k2 = readRec.f0;
+ final String v2 = readRec.f1;
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
this.memoryManager.release(inView.close());
@@ -187,8 +187,9 @@ public class ChannelViewsTest
@Test
public void testReadTooMany() throws Exception
{
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final FileIOChannel.ID channel = this.ioManager.createChannel();
+ final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -196,10 +197,10 @@ public class ChannelViewsTest
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
- final Record rec = new Record();
+ final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
- rec.write(outView);
+ serializer.serialize(rec, outView);
}
this.memoryManager.release(outView.close());
@@ -211,15 +212,15 @@ public class ChannelViewsTest
// read and re-generate all records and compare them
try {
- final Record readRec = new Record();
+ final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT + 1; i++) {
generator.next(rec);
- readRec.read(inView);
- final Key k1 = rec.getField(0, Key.class);
- final Value v1 = rec.getField(1, Value.class);
- final Key k2 = readRec.getField(0, Key.class);
- final Value v2 = readRec.getField(1, Value.class);
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ serializer.deserialize(readRec, inView);
+ final int k1 = rec.f0;
+ final String v1 = rec.f1;
+ final int k2 = readRec.f0;
+ final String v2 = readRec.f1;
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
Assert.fail("Expected an EOFException which did not occur.");
}
@@ -238,8 +239,9 @@ public class ChannelViewsTest
@Test
public void testReadWithoutKnownBlockCount() throws Exception
{
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final FileIOChannel.ID channel = this.ioManager.createChannel();
+ final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -247,10 +249,10 @@ public class ChannelViewsTest
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
- final Record rec = new Record();
+ final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
- rec.write(outView);
+ serializer.serialize(rec, outView);
}
this.memoryManager.release(outView.close());
@@ -261,18 +263,18 @@ public class ChannelViewsTest
generator.reset();
// read and re-generate all records and cmpare them
- final Record readRec = new Record();
+ final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
- readRec.read(inView);
+ serializer.deserialize(readRec, inView);
- Key k1 = rec.getField(0, Key.class);
- Value v1 = rec.getField(1, Value.class);
+ int k1 = rec.f0;
+ String v1 = rec.f1;
- Key k2 = readRec.getField(0, Key.class);
- Value v2 = readRec.getField(1, Value.class);
+ int k2 = readRec.f0;
+ String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
this.memoryManager.release(inView.close());
@@ -282,8 +284,9 @@ public class ChannelViewsTest
@Test
public void testWriteReadOneBufferOnly() throws Exception
{
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final FileIOChannel.ID channel = this.ioManager.createChannel();
+ final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, 1);
@@ -291,10 +294,10 @@ public class ChannelViewsTest
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
- final Record rec = new Record();
+ final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
- rec.write(outView);
+ serializer.serialize(rec, outView);
}
this.memoryManager.release(outView.close());
@@ -305,18 +308,18 @@ public class ChannelViewsTest
generator.reset();
// read and re-generate all records and compare them
- final Record readRec = new Record();
+ final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
- readRec.read(inView);
+ serializer.deserialize(readRec, inView);
- Key k1 = rec.getField(0, Key.class);
- Value v1 = rec.getField(1, Value.class);
+ int k1 = rec.f0;
+ String v1 = rec.f1;
- Key k2 = readRec.getField(0, Key.class);
- Value v2 = readRec.getField(1, Value.class);
+ int k2 = readRec.f0;
+ String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
this.memoryManager.release(inView.close());
@@ -326,8 +329,9 @@ public class ChannelViewsTest
@Test
public void testWriteReadNotAll() throws Exception
{
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_SHORT_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
final FileIOChannel.ID channel = this.ioManager.createChannel();
+ final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
// create the writer output view
List<MemorySegment> memory = this.memoryManager.allocatePages(this.parentTask, NUM_MEMORY_SEGMENTS);
@@ -335,10 +339,10 @@ public class ChannelViewsTest
final ChannelWriterOutputView outView = new ChannelWriterOutputView(writer, memory, MEMORY_PAGE_SIZE);
// write a number of pairs
- final Record rec = new Record();
+ final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT; i++) {
generator.next(rec);
- rec.write(outView);
+ serializer.serialize(rec, outView);
}
this.memoryManager.release(outView.close());
@@ -349,18 +353,18 @@ public class ChannelViewsTest
generator.reset();
// read and re-generate all records and compare them
- final Record readRec = new Record();
+ final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_SHORT / 2; i++) {
generator.next(rec);
- readRec.read(inView);
+ serializer.deserialize(readRec, inView);
- Key k1 = rec.getField(0, Key.class);
- Value v1 = rec.getField(1, Value.class);
+ int k1 = rec.f0;
+ String v1 = rec.f1;
- Key k2 = readRec.getField(0, Key.class);
- Value v2 = readRec.getField(1, Value.class);
+ int k2 = readRec.f0;
+ String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
this.memoryManager.release(inView.close());
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
index 0b1e0c3..538c416 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/SpillingBufferTest.java
@@ -27,11 +27,10 @@ import org.apache.flink.runtime.memory.ListMemorySegmentSource;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Key;
-import org.apache.flink.runtime.operators.testutils.TestData.Value;
-import org.apache.flink.types.Record;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.junit.After;
import org.junit.Assert;
@@ -90,7 +89,8 @@ public class SpillingBufferTest {
@Test
public void testWriteReadInMemory() throws Exception {
- final TestData.Generator generator = new TestData.Generator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
// create the writer output view
final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -99,10 +99,10 @@ public class SpillingBufferTest {
new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
// write a number of pairs
- final Record rec = new Record();
+ final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
generator.next(rec);
- rec.write(outView);
+ serializer.serialize(rec, outView);
}
// create the reader input view
@@ -110,18 +110,18 @@ public class SpillingBufferTest {
generator.reset();
// read and re-generate all records and compare them
- final Record readRec = new Record();
+ final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
generator.next(rec);
- readRec.read(inView);
+ serializer.deserialize(readRec, inView);
- Key k1 = rec.getField(0, Key.class);
- Value v1 = rec.getField(1, Value.class);
+ int k1 = rec.f0;
+ String v1 = rec.f1;
- Key k2 = readRec.getField(0, Key.class);
- Value v2 = readRec.getField(1, Value.class);
+ int k2 = readRec.f0;
+ String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
// re-read the data
@@ -131,15 +131,15 @@ public class SpillingBufferTest {
// read and re-generate all records and compare them
for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
generator.next(rec);
- readRec.read(inView);
+ serializer.deserialize(readRec, inView);
- Key k1 = rec.getField(0, Key.class);
- Value v1 = rec.getField(1, Value.class);
+ int k1 = rec.f0;
+ String v1 = rec.f1;
- Key k2 = readRec.getField(0, Key.class);
- Value v2 = readRec.getField(1, Value.class);
+ int k2 = readRec.f0;
+ String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
this.memoryManager.release(outView.close());
@@ -148,8 +148,9 @@ public class SpillingBufferTest {
@Test
public void testWriteReadTooMuchInMemory() throws Exception {
- final TestData.Generator generator = new TestData.Generator(
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(
SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
// create the writer output view
final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -158,10 +159,10 @@ public class SpillingBufferTest {
new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
// write a number of pairs
- final Record rec = new Record();
+ final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
generator.next(rec);
- rec.write(outView);
+ serializer.serialize(rec, outView);
}
// create the reader input view
@@ -169,19 +170,19 @@ public class SpillingBufferTest {
generator.reset();
// read and re-generate all records and compare them
- final Record readRec = new Record();
+ final Tuple2<Integer, String> readRec = new Tuple2<>();
try {
for (int i = 0; i < NUM_PAIRS_INMEM + 1; i++) {
generator.next(rec);
- readRec.read(inView);
+ serializer.deserialize(readRec, inView);
- Key k1 = rec.getField(0, Key.class);
- Value v1 = rec.getField(1, Value.class);
+ int k1 = rec.f0;
+ String v1 = rec.f1;
- Key k2 = readRec.getField(0, Key.class);
- Value v2 = readRec.getField(1, Value.class);
+ int k2 = readRec.f0;
+ String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
Assert.fail("Read too much, expected EOFException.");
}
@@ -196,15 +197,15 @@ public class SpillingBufferTest {
// read and re-generate all records and compare them
for (int i = 0; i < NUM_PAIRS_INMEM; i++) {
generator.next(rec);
- readRec.read(inView);
+ serializer.deserialize(readRec, inView);
- Key k1 = rec.getField(0, Key.class);
- Value v1 = rec.getField(1, Value.class);
+ int k1 = rec.f0;
+ String v1 = rec.f1;
- Key k2 = readRec.getField(0, Key.class);
- Value v2 = readRec.getField(1, Value.class);
+ int k2 = readRec.f0;
+ String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
this.memoryManager.release(outView.close());
@@ -215,8 +216,9 @@ public class SpillingBufferTest {
@Test
public void testWriteReadExternal() throws Exception {
- final TestData.Generator generator = new TestData.Generator(
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(
SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
// create the writer output view
final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -225,10 +227,10 @@ public class SpillingBufferTest {
new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
// write a number of pairs
- final Record rec = new Record();
+ final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
generator.next(rec);
- rec.write(outView);
+ serializer.serialize(rec, outView);
}
// create the reader input view
@@ -236,18 +238,18 @@ public class SpillingBufferTest {
generator.reset();
// read and re-generate all records and compare them
- final Record readRec = new Record();
+ final Tuple2<Integer, String> readRec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
generator.next(rec);
- readRec.read(inView);
+ serializer.deserialize(readRec, inView);
- Key k1 = rec.getField(0, Key.class);
- Value v1 = rec.getField(1, Value.class);
+ int k1 = rec.f0;
+ String v1 = rec.f1;
- Key k2 = readRec.getField(0, Key.class);
- Value v2 = readRec.getField(1, Value.class);
+ int k2 = readRec.f0;
+ String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
// re-read the data
@@ -257,15 +259,15 @@ public class SpillingBufferTest {
// read and re-generate all records and compare them
for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
generator.next(rec);
- readRec.read(inView);
+ serializer.deserialize(readRec, inView);
- Key k1 = rec.getField(0, Key.class);
- Value v1 = rec.getField(1, Value.class);
+ int k1 = rec.f0;
+ String v1 = rec.f1;
- Key k2 = readRec.getField(0, Key.class);
- Value v2 = readRec.getField(1, Value.class);
+ int k2 = readRec.f0;
+ String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
this.memoryManager.release(outView.close());
@@ -274,8 +276,9 @@ public class SpillingBufferTest {
@Test
public void testWriteReadTooMuchExternal() throws Exception {
- final TestData.Generator generator = new TestData.Generator(
+ final TestData.TupleGenerator generator = new TestData.TupleGenerator(
SEED, KEY_MAX, VALUE_LENGTH, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TypeSerializer<Tuple2<Integer, String>> serializer = TestData.getIntStringTupleSerializer();
// create the writer output view
final ArrayList<MemorySegment> memory = new ArrayList<MemorySegment>(NUM_MEMORY_SEGMENTS);
@@ -284,10 +287,10 @@ public class SpillingBufferTest {
new ListMemorySegmentSource(memory), this.memoryManager.getPageSize());
// write a number of pairs
- final Record rec = new Record();
+ final Tuple2<Integer, String> rec = new Tuple2<>();
for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
generator.next(rec);
- rec.write(outView);
+ serializer.serialize(rec, outView);
}
// create the reader input view
@@ -295,19 +298,19 @@ public class SpillingBufferTest {
generator.reset();
// read and re-generate all records and compare them
- final Record readRec = new Record();
+ final Tuple2<Integer, String> readRec = new Tuple2<>();
try {
for (int i = 0; i < NUM_PAIRS_EXTERNAL + 1; i++) {
generator.next(rec);
- readRec.read(inView);
+ serializer.deserialize(readRec, inView);
- Key k1 = rec.getField(0, Key.class);
- Value v1 = rec.getField(1, Value.class);
+ int k1 = rec.f0;
+ String v1 = rec.f1;
- Key k2 = readRec.getField(0, Key.class);
- Value v2 = readRec.getField(1, Value.class);
+ int k2 = readRec.f0;
+ String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
Assert.fail("Read too much, expected EOFException.");
}
@@ -322,15 +325,15 @@ public class SpillingBufferTest {
// read and re-generate all records and compare them
for (int i = 0; i < NUM_PAIRS_EXTERNAL; i++) {
generator.next(rec);
- readRec.read(inView);
+ serializer.deserialize(readRec, inView);
- Key k1 = rec.getField(0, Key.class);
- Value v1 = rec.getField(1, Value.class);
+ int k1 = rec.f0;
+ String v1 = rec.f1;
- Key k2 = readRec.getField(0, Key.class);
- Value v2 = readRec.getField(1, Value.class);
+ int k2 = readRec.f0;
+ String v2 = readRec.f1;
- Assert.assertTrue("The re-generated and the read record do not match.", k1.equals(k2) && v1.equals(v2));
+ Assert.assertTrue("The re-generated and the read record do not match.", k1 == k2 && v1.equals(v2));
}
this.memoryManager.release(outView.close());
http://git-wip-us.apache.org/repos/asf/flink/blob/fbc18b96/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
index 2da97e9..1795062 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashMatchIteratorITCase.java
@@ -19,15 +19,10 @@
package org.apache.flink.runtime.operators.hash;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparator;
-import org.apache.flink.api.common.typeutils.record.RecordSerializer;
-import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -35,18 +30,14 @@ import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.TestData;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.KeyMode;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator.ValueMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode;
+import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode;
import org.apache.flink.runtime.operators.testutils.UniformIntPairGenerator;
import org.apache.flink.runtime.operators.testutils.UnionIterator;
import org.apache.flink.runtime.operators.testutils.types.IntPair;
-import org.apache.flink.runtime.operators.testutils.types.IntPairComparator;
import org.apache.flink.runtime.operators.testutils.types.IntPairSerializer;
-import org.apache.flink.types.IntValue;
import org.apache.flink.types.NullKeyFieldException;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
@@ -60,6 +51,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.flink.api.common.typeutils.GenericPairComparator;
+import org.apache.flink.api.java.tuple.Tuple2;
@SuppressWarnings({"serial", "deprecation"})
public class NonReusingHashMatchIteratorITCase {
@@ -77,31 +70,31 @@ public class NonReusingHashMatchIteratorITCase {
private IOManager ioManager;
private MemoryManager memoryManager;
- private TypeSerializer<Record> recordSerializer;
- private TypeComparator<Record> record1Comparator;
- private TypeComparator<Record> record2Comparator;
- private TypePairComparator<Record, Record> recordPairComparator;
+ private TypeSerializer<Tuple2<Integer, String>> recordSerializer;
+ private TypeComparator<Tuple2<Integer, String>> record1Comparator;
+ private TypeComparator<Tuple2<Integer, String>> record2Comparator;
+ private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> recordPairComparator;
private TypeSerializer<IntPair> pairSerializer;
private TypeComparator<IntPair> pairComparator;
- private TypePairComparator<IntPair, Record> pairRecordPairComparator;
- private TypePairComparator<Record, IntPair> recordPairPairComparator;
+ private TypePairComparator<IntPair, Tuple2<Integer, String>> pairRecordPairComparator;
+ private TypePairComparator<Tuple2<Integer, String>, IntPair> recordPairPairComparator;
@SuppressWarnings("unchecked")
@Before
public void beforeTest() {
- this.recordSerializer = RecordSerializer.get();
+ this.recordSerializer = TestData.getIntStringTupleSerializer();
- this.record1Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
- this.record2Comparator = new RecordComparator(new int[] {0}, new Class[] {TestData.Key.class});
+ this.record1Comparator = TestData.getIntStringTupleComparator();
+ this.record2Comparator = TestData.getIntStringTupleComparator();
- this.recordPairComparator = new RecordPairComparator(new int[] {0}, new int[] {0}, new Class[] {TestData.Key.class});
+ this.recordPairComparator = new GenericPairComparator(record1Comparator, record2Comparator);
this.pairSerializer = new IntPairSerializer();
- this.pairComparator = new IntPairComparator();
- this.pairRecordPairComparator = new IntPairRecordPairComparator();
- this.recordPairPairComparator = new RecordIntPairPairComparator();
+ this.pairComparator = new TestData.IntPairComparator();
+ this.pairRecordPairComparator = new IntPairTuplePairComparator();
+ this.recordPairPairComparator = new TupleIntPairPairComparator();
this.memoryManager = new MemoryManager(MEMORY_SIZE, 1);
this.ioManager = new IOManagerAsync();
@@ -129,19 +122,19 @@ public class NonReusingHashMatchIteratorITCase {
@Test
public void testBuildFirst() {
try {
- Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
- final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
- collectRecordData(input1),
- collectRecordData(input2));
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+ collectTupleData(input1),
+ collectTupleData(input2));
- final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
// reset the generators
generator1.reset();
@@ -150,8 +143,8 @@ public class NonReusingHashMatchIteratorITCase {
input2.reset();
// compare with iterator values
- NonReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
- new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+ NonReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new NonReusingBuildFirstHashMatchIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -163,7 +156,7 @@ public class NonReusingHashMatchIteratorITCase {
iterator.close();
// assert that each expected match was seen
- for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -187,31 +180,31 @@ public class NonReusingHashMatchIteratorITCase {
final int DUPLICATE_KEY = 13;
try {
- Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
- final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
- final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+ final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+ final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
- final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
inList1.add(gen1Iter);
inList1.add(const1Iter);
- final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
inList2.add(gen2Iter);
inList2.add(const2Iter);
- MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
- MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+ MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+ MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
// collect expected data
- final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
- collectRecordData(input1),
- collectRecordData(input2));
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+ collectTupleData(input1),
+ collectTupleData(input2));
// re-create the whole thing for actual processing
@@ -231,14 +224,14 @@ public class NonReusingHashMatchIteratorITCase {
inList2.add(gen2Iter);
inList2.add(const2Iter);
- input1 = new UnionIterator<Record>(inList1);
- input2 = new UnionIterator<Record>(inList2);
+ input1 = new UnionIterator<>(inList1);
+ input2 = new UnionIterator<>(inList2);
- final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
- NonReusingBuildFirstHashMatchIterator<Record, Record, Record> iterator =
- new NonReusingBuildFirstHashMatchIterator<Record, Record, Record>(
+ NonReusingBuildFirstHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new NonReusingBuildFirstHashMatchIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -250,7 +243,7 @@ public class NonReusingHashMatchIteratorITCase {
iterator.close();
// assert that each expected match was seen
- for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -265,19 +258,19 @@ public class NonReusingHashMatchIteratorITCase {
@Test
public void testBuildSecond() {
try {
- Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator input1 = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
- final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
- collectRecordData(input1),
- collectRecordData(input2));
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+ collectTupleData(input1),
+ collectTupleData(input2));
- final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
generator1.reset();
@@ -286,8 +279,8 @@ public class NonReusingHashMatchIteratorITCase {
input2.reset();
// compare with iterator values
- NonReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
- new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+ NonReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new NonReusingBuildSecondHashMatchIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -299,7 +292,7 @@ public class NonReusingHashMatchIteratorITCase {
iterator.close();
// assert that each expected match was seen
- for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -323,31 +316,31 @@ public class NonReusingHashMatchIteratorITCase {
final int DUPLICATE_KEY = 13;
try {
- Generator generator1 = new Generator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator gen1Iter = new TestData.GeneratorIterator(generator1, INPUT_1_SIZE);
- final TestData.GeneratorIterator gen2Iter = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE);
+ final TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
- final TestData.ConstantValueIterator const1Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
- final TestData.ConstantValueIterator const2Iter = new TestData.ConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
+ final TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "LEFT String for Duplicate Keys", INPUT_1_DUPLICATES);
+ final TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(DUPLICATE_KEY, "RIGHT String for Duplicate Keys", INPUT_2_DUPLICATES);
- final List<MutableObjectIterator<Record>> inList1 = new ArrayList<MutableObjectIterator<Record>>();
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList1 = new ArrayList<>();
inList1.add(gen1Iter);
inList1.add(const1Iter);
- final List<MutableObjectIterator<Record>> inList2 = new ArrayList<MutableObjectIterator<Record>>();
+ final List<MutableObjectIterator<Tuple2<Integer, String>>> inList2 = new ArrayList<>();
inList2.add(gen2Iter);
inList2.add(const2Iter);
- MutableObjectIterator<Record> input1 = new UnionIterator<Record>(inList1);
- MutableObjectIterator<Record> input2 = new UnionIterator<Record>(inList2);
+ MutableObjectIterator<Tuple2<Integer, String>> input1 = new UnionIterator<>(inList1);
+ MutableObjectIterator<Tuple2<Integer, String>> input2 = new UnionIterator<>(inList2);
// collect expected data
- final Map<TestData.Key, Collection<RecordMatch>> expectedMatchesMap = matchRecordValues(
- collectRecordData(input1),
- collectRecordData(input2));
+ final Map<Integer, Collection<TupleMatch>> expectedMatchesMap = matchSecondTupleFields(
+ collectTupleData(input1),
+ collectTupleData(input2));
// re-create the whole thing for actual processing
@@ -367,14 +360,14 @@ public class NonReusingHashMatchIteratorITCase {
inList2.add(gen2Iter);
inList2.add(const2Iter);
- input1 = new UnionIterator<Record>(inList1);
- input2 = new UnionIterator<Record>(inList2);
+ input1 = new UnionIterator<>(inList1);
+ input2 = new UnionIterator<>(inList2);
- final JoinFunction matcher = new RecordMatchRemovingJoin(expectedMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
- NonReusingBuildSecondHashMatchIterator<Record, Record, Record> iterator =
- new NonReusingBuildSecondHashMatchIterator<Record, Record, Record>(
+ NonReusingBuildSecondHashMatchIterator<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new NonReusingBuildSecondHashMatchIterator<>(
input1, input2, this.recordSerializer, this.record1Comparator,
this.recordSerializer, this.record2Comparator, this.recordPairComparator,
this.memoryManager, ioManager, this.parentTask, 1.0, true);
@@ -386,7 +379,7 @@ public class NonReusingHashMatchIteratorITCase {
iterator.close();
// assert that each expected match was seen
- for (Entry<TestData.Key, Collection<RecordMatch>> entry : expectedMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -403,16 +396,16 @@ public class NonReusingHashMatchIteratorITCase {
try {
MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
- final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
- final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+ final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
collectIntPairData(input1),
- collectRecordData(input2));
+ collectTupleData(input2));
- final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<Tuple2<Integer, String>>();
// reset the generators
input1 = new UniformIntPairGenerator(500, 40, false);
@@ -420,8 +413,8 @@ public class NonReusingHashMatchIteratorITCase {
input2.reset();
// compare with iterator values
- NonReusingBuildSecondHashMatchIterator<IntPair, Record, Record> iterator =
- new NonReusingBuildSecondHashMatchIterator<IntPair, Record, Record>(
+ NonReusingBuildSecondHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new NonReusingBuildSecondHashMatchIterator<>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator,
this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
@@ -433,7 +426,7 @@ public class NonReusingHashMatchIteratorITCase {
iterator.close();
// assert that each expected match was seen
- for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -450,16 +443,16 @@ public class NonReusingHashMatchIteratorITCase {
try {
MutableObjectIterator<IntPair> input1 = new UniformIntPairGenerator(500, 40, false);
- final Generator generator2 = new Generator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
- final TestData.GeneratorIterator input2 = new TestData.GeneratorIterator(generator2, INPUT_2_SIZE);
+ final TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH);
+ final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE);
// collect expected data
- final Map<TestData.Key, Collection<RecordIntPairMatch>> expectedMatchesMap = matchRecordIntPairValues(
+ final Map<Integer, Collection<TupleIntPairMatch>> expectedMatchesMap = matchTupleIntPairValues(
collectIntPairData(input1),
- collectRecordData(input2));
+ collectTupleData(input2));
- final FlatJoinFunction<IntPair, Record, Record> matcher = new RecordIntPairMatchRemovingMatcher(expectedMatchesMap);
- final Collector<Record> collector = new DiscardingOutputCollector<Record>();
+ final FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> matcher = new TupleIntPairMatchRemovingMatcher(expectedMatchesMap);
+ final Collector<Tuple2<Integer, String>> collector = new DiscardingOutputCollector<>();
// reset the generators
input1 = new UniformIntPairGenerator(500, 40, false);
@@ -467,8 +460,8 @@ public class NonReusingHashMatchIteratorITCase {
input2.reset();
// compare with iterator values
- NonReusingBuildFirstHashMatchIterator<IntPair, Record, Record> iterator =
- new NonReusingBuildFirstHashMatchIterator<IntPair, Record, Record>(
+ NonReusingBuildFirstHashMatchIterator<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>> iterator =
+ new NonReusingBuildFirstHashMatchIterator<>(
input1, input2, this.pairSerializer, this.pairComparator,
this.recordSerializer, this.record2Comparator, this.recordPairPairComparator,
this.memoryManager, this.ioManager, this.parentTask, 1.0, true);
@@ -480,7 +473,7 @@ public class NonReusingHashMatchIteratorITCase {
iterator.close();
// assert that each expected match was seen
- for (Entry<TestData.Key, Collection<RecordIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
+ for (Entry<Integer, Collection<TupleIntPairMatch>> entry : expectedMatchesMap.entrySet()) {
if (!entry.getValue().isEmpty()) {
Assert.fail("Collection for key " + entry.getKey() + " is not empty");
}
@@ -498,29 +491,29 @@ public class NonReusingHashMatchIteratorITCase {
- static Map<TestData.Key, Collection<RecordMatch>> matchRecordValues(
- Map<TestData.Key, Collection<TestData.Value>> leftMap,
- Map<TestData.Key, Collection<TestData.Value>> rightMap)
+ static Map<Integer, Collection<TupleMatch>> matchSecondTupleFields(
+ Map<Integer, Collection<String>> leftMap,
+ Map<Integer, Collection<String>> rightMap)
{
- Map<TestData.Key, Collection<RecordMatch>> map = new HashMap<TestData.Key, Collection<RecordMatch>>();
+ Map<Integer, Collection<TupleMatch>> map = new HashMap<>();
- for (TestData.Key key : leftMap.keySet()) {
- Collection<TestData.Value> leftValues = leftMap.get(key);
- Collection<TestData.Value> rightValues = rightMap.get(key);
+ for (Integer key : leftMap.keySet()) {
+ Collection<String> leftValues = leftMap.get(key);
+ Collection<String> rightValues = rightMap.get(key);
if (rightValues == null) {
continue;
}
if (!map.containsKey(key)) {
- map.put(key, new ArrayList<RecordMatch>());
+ map.put(key, new ArrayList<TupleMatch>());
}
- Collection<RecordMatch> matchedValues = map.get(key);
+ Collection<TupleMatch> matchedValues = map.get(key);
- for (TestData.Value leftValue : leftValues) {
- for (TestData.Value rightValue : rightValues) {
- matchedValues.add(new RecordMatch(leftValue, rightValue));
+ for (String leftValue : leftValues) {
+ for (String rightValue : rightValues) {
+ matchedValues.add(new TupleMatch(leftValue, rightValue));
}
}
}
@@ -528,32 +521,30 @@ public class NonReusingHashMatchIteratorITCase {
return map;
}
- static Map<TestData.Key, Collection<RecordIntPairMatch>> matchRecordIntPairValues(
+ static Map<Integer, Collection<TupleIntPairMatch>> matchTupleIntPairValues(
Map<Integer, Collection<Integer>> leftMap,
- Map<TestData.Key, Collection<TestData.Value>> rightMap)
+ Map<Integer, Collection<String>> rightMap)
{
- final Map<TestData.Key, Collection<RecordIntPairMatch>> map = new HashMap<TestData.Key, Collection<RecordIntPairMatch>>();
+ final Map<Integer, Collection<TupleIntPairMatch>> map = new HashMap<>();
for (Integer i : leftMap.keySet()) {
- final TestData.Key key = new TestData.Key(i.intValue());
-
final Collection<Integer> leftValues = leftMap.get(i);
- final Collection<TestData.Value> rightValues = rightMap.get(key);
+ final Collection<String> rightValues = rightMap.get(i);
if (rightValues == null) {
continue;
}
- if (!map.containsKey(key)) {
- map.put(key, new ArrayList<RecordIntPairMatch>());
+ if (!map.containsKey(i)) {
+ map.put(i, new ArrayList<TupleIntPairMatch>());
}
- final Collection<RecordIntPairMatch> matchedValues = map.get(key);
+ final Collection<TupleIntPairMatch> matchedValues = map.get(i);
for (Integer v : leftValues) {
- for (TestData.Value val : rightValues) {
- matchedValues.add(new RecordIntPairMatch(v, val));
+ for (String val : rightValues) {
+ matchedValues.add(new TupleIntPairMatch(v, val));
}
}
}
@@ -562,21 +553,21 @@ public class NonReusingHashMatchIteratorITCase {
}
- static Map<TestData.Key, Collection<TestData.Value>> collectRecordData(MutableObjectIterator<Record> iter)
+ static Map<Integer, Collection<String>> collectTupleData(MutableObjectIterator<Tuple2<Integer, String>> iter)
throws Exception
{
- Map<TestData.Key, Collection<TestData.Value>> map = new HashMap<TestData.Key, Collection<TestData.Value>>();
- Record pair = new Record();
+ Map<Integer, Collection<String>> map = new HashMap<>();
+ Tuple2<Integer, String> pair = new Tuple2<>();
while ((pair = iter.next(pair)) != null) {
- TestData.Key key = pair.getField(0, TestData.Key.class);
+ Integer key = pair.f0;
if (!map.containsKey(key)) {
- map.put(new TestData.Key(key.getKey()), new ArrayList<TestData.Value>());
+ map.put(key, new ArrayList<String>());
}
- Collection<TestData.Value> values = map.get(key);
- values.add(new TestData.Value(pair.getField(1, TestData.Value.class).getValue()));
+ Collection<String> values = map.get(key);
+ values.add(pair.f1);
}
return map;
@@ -606,19 +597,19 @@ public class NonReusingHashMatchIteratorITCase {
/**
* Private class used for storage of the expected matches in a hash-map.
*/
- static class RecordMatch {
+ static class TupleMatch {
- private final Value left;
- private final Value right;
+ private final String left;
+ private final String right;
- public RecordMatch(Value left, Value right) {
+ public TupleMatch(String left, String right) {
this.left = left;
this.right = right;
}
@Override
public boolean equals(Object obj) {
- RecordMatch o = (RecordMatch) obj;
+ TupleMatch o = (TupleMatch) obj;
return this.left.equals(o.left) && this.right.equals(o.right);
}
@@ -636,19 +627,19 @@ public class NonReusingHashMatchIteratorITCase {
/**
* Private class used for storage of the expected matches in a hash-map.
*/
- static class RecordIntPairMatch
+ static class TupleIntPairMatch
{
private final int left;
- private final Value right;
+ private final String right;
- public RecordIntPairMatch(int left, Value right) {
+ public TupleIntPairMatch(int left, String right) {
this.left = left;
- this.right = right;
+ this.right = new String(right);
}
@Override
public boolean equals(Object obj) {
- RecordIntPairMatch o = (RecordIntPairMatch) obj;
+ TupleIntPairMatch o = (TupleIntPairMatch) obj;
return this.left == o.left && this.right.equals(o.right);
}
@@ -663,28 +654,28 @@ public class NonReusingHashMatchIteratorITCase {
}
}
- static final class RecordMatchRemovingJoin extends JoinFunction
+ static final class TupleMatchRemovingJoin implements FlatJoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, Tuple2<Integer, String>>
{
- private final Map<TestData.Key, Collection<RecordMatch>> toRemoveFrom;
+ private final Map<Integer, Collection<TupleMatch>> toRemoveFrom;
- protected RecordMatchRemovingJoin(Map<TestData.Key, Collection<RecordMatch>> map) {
+ protected TupleMatchRemovingJoin(Map<Integer, Collection<TupleMatch>> map) {
this.toRemoveFrom = map;
}
@Override
- public void join(Record rec1, Record rec2, Collector<Record> out) throws Exception
+ public void join(Tuple2<Integer, String> rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
{
- TestData.Key key = rec1.getField(0, TestData.Key.class);
- TestData.Value value1 = rec1.getField(1, TestData.Value.class);
- TestData.Value value2 = rec2.getField(1, TestData.Value.class);
- //System.err.println("rec1 key = "+key+" rec2 key= "+rec2.getField(0, TestData.Key.class));
- Collection<RecordMatch> matches = this.toRemoveFrom.get(key);
+ int key = rec1.f0;
+ String value1 = rec1.f1;
+ String value2 = rec2.f1;
+ //System.err.println("rec1 key = "+key+" rec2 key= "+rec2.f0);
+ Collection<TupleMatch> matches = this.toRemoveFrom.get(key);
if (matches == null) {
Assert.fail("Match " + key + " - " + value1 + ":" + value2 + " is unexpected.");
}
Assert.assertTrue("Produced match was not contained: " + key + " - " + value1 + ":" + value2,
- matches.remove(new RecordMatch(value1, value2)));
+ matches.remove(new TupleMatch(value1, value2)));
if (matches.isEmpty()) {
this.toRemoveFrom.remove(key);
@@ -692,32 +683,32 @@ public class NonReusingHashMatchIteratorITCase {
}
}
- static final class RecordIntPairMatchRemovingMatcher extends AbstractRichFunction implements FlatJoinFunction<IntPair, Record, Record>
+ static final class TupleIntPairMatchRemovingMatcher implements FlatJoinFunction<IntPair, Tuple2<Integer, String>, Tuple2<Integer, String>>
{
- private final Map<TestData.Key, Collection<RecordIntPairMatch>> toRemoveFrom;
+ private final Map<Integer, Collection<TupleIntPairMatch>> toRemoveFrom;
- protected RecordIntPairMatchRemovingMatcher(Map<TestData.Key, Collection<RecordIntPairMatch>> map) {
+ protected TupleIntPairMatchRemovingMatcher(Map<Integer, Collection<TupleIntPairMatch>> map) {
this.toRemoveFrom = map;
}
@Override
- public void join(IntPair rec1, Record rec2, Collector<Record> out) throws Exception
+ public void join(IntPair rec1, Tuple2<Integer, String> rec2, Collector<Tuple2<Integer, String>> out) throws Exception
{
final int k = rec1.getKey();
final int v = rec1.getValue();
- final TestData.Key key = rec2.getField(0, TestData.Key.class);
- final TestData.Value value = rec2.getField(1, TestData.Value.class);
-
- Assert.assertTrue("Key does not match for matching IntPair Record combination.", k == key.getKey());
+ final Integer key = rec2.f0;
+ final String value = rec2.f1;
+
+ Assert.assertTrue("Key does not match for matching IntPair Tuple combination.", k == key);
- Collection<RecordIntPairMatch> matches = this.toRemoveFrom.get(key);
+ Collection<TupleIntPairMatch> matches = this.toRemoveFrom.get(key);
if (matches == null) {
Assert.fail("Match " + key + " - " + v + ":" + value + " is unexpected.");
}
Assert.assertTrue("Produced match was not contained: " + key + " - " + v + ":" + value,
- matches.remove(new RecordIntPairMatch(v, value)));
+ matches.remove(new TupleIntPairMatch(v, value)));
if (matches.isEmpty()) {
this.toRemoveFrom.remove(key);
@@ -725,7 +716,7 @@ public class NonReusingHashMatchIteratorITCase {
}
}
- static final class IntPairRecordPairComparator extends TypePairComparator<IntPair, Record>
+ static final class IntPairTuplePairComparator extends TypePairComparator<IntPair, Tuple2<Integer, String>>
{
private int reference;
@@ -735,33 +726,31 @@ public class NonReusingHashMatchIteratorITCase {
}
@Override
- public boolean equalToReference(Record candidate) {
+ public boolean equalToReference(Tuple2<Integer, String> candidate) {
try {
- final IntValue i = candidate.getField(0, IntValue.class);
- return i.getValue() == this.reference;
+ return candidate.f0 == this.reference;
} catch (NullPointerException npex) {
throw new NullKeyFieldException();
}
}
@Override
- public int compareToReference(Record candidate) {
+ public int compareToReference(Tuple2<Integer, String> candidate) {
try {
- final IntValue i = candidate.getField(0, IntValue.class);
- return i.getValue() - this.reference;
+ return candidate.f0 - this.reference;
} catch (NullPointerException npex) {
throw new NullKeyFieldException();
}
}
}
- static final class RecordIntPairPairComparator extends TypePairComparator<Record, IntPair>
+ static final class TupleIntPairPairComparator extends TypePairComparator<Tuple2<Integer, String>, IntPair>
{
private int reference;
@Override
- public void setReference(Record reference) {
- this.reference = reference.getField(0, IntValue.class).getValue();
+ public void setReference(Tuple2<Integer, String> reference) {
+ this.reference = reference.f0;
}
@Override