You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/23 16:06:09 UTC
[4/4] flink git commit: [FLINK-1982] [record-api] Remove dependencies
on Record API from flink-runtime tests
[FLINK-1982] [record-api] Remove dependencies on Record API from flink-runtime tests
Rename Match*Test to Join*Test and MapTaskTest to FlatMapTaskTest
This closes #1294
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0d7073a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0d7073a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0d7073a
Branch: refs/heads/release-0.10
Commit: c0d7073a5166fdcd6cfbea741ae0bfa132e2bf04
Parents: 712c868
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Oct 22 21:10:41 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Oct 23 16:05:26 2015 +0200
----------------------------------------------------------------------
.../operators/CombineTaskExternalITCase.java | 16 +-
.../flink/runtime/operators/CrossTaskTest.java | 41 +-
.../runtime/operators/DataSinkTaskTest.java | 52 +-
.../runtime/operators/DataSourceTaskTest.java | 16 +-
.../runtime/operators/FlatMapTaskTest.java | 149 +++
.../operators/JoinTaskExternalITCase.java | 165 +++
.../flink/runtime/operators/JoinTaskTest.java | 1017 +++++++++++++++++
.../flink/runtime/operators/MapTaskTest.java | 151 ---
.../operators/MatchTaskExternalITCase.java | 167 ---
.../flink/runtime/operators/MatchTaskTest.java | 1019 ------------------
.../operators/ReduceTaskExternalITCase.java | 16 +-
.../flink/runtime/operators/ReduceTaskTest.java | 19 +-
.../operators/chaining/ChainTaskTest.java | 25 +-
.../operators/testutils/TaskTestBase.java | 18 +-
14 files changed, 1428 insertions(+), 1443 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index 4905e57..800bca7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -22,11 +22,11 @@ import java.util.ArrayList;
import java.util.HashMap;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -42,7 +42,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
private final double combine_frac;
- private final ArrayList<Record> outList = new ArrayList<Record>();
+ private final ArrayList<Record> outList = new ArrayList<>();
@SuppressWarnings("unchecked")
private final RecordComparator comparator = new RecordComparator(
@@ -69,7 +69,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
getTaskConfig().setRelativeMemoryDriver(combine_frac);
getTaskConfig().setFilehandlesDriver(2);
- final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
+ final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<>();
try {
testDriver(testTask, MockCombiningReduceStub.class);
@@ -85,7 +85,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
// wee need to do the final aggregation manually in the test, because the
// combiner is not guaranteed to do that
- final HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>();
+ final HashMap<IntValue, IntValue> aggMap = new HashMap<>();
for (Record record : this.outList) {
IntValue key = new IntValue();
IntValue value = new IntValue();
@@ -123,7 +123,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
getTaskConfig().setRelativeMemoryDriver(combine_frac);
getTaskConfig().setFilehandlesDriver(2);
- final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
+ final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<>();
try {
testDriver(testTask, MockCombiningReduceStub.class);
@@ -139,7 +139,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
// wee need to do the final aggregation manually in the test, because the
// combiner is not guaranteed to do that
- final HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>();
+ final HashMap<IntValue, IntValue> aggMap = new HashMap<>();
for (Record record : this.outList) {
IntValue key = new IntValue();
IntValue value = new IntValue();
@@ -166,7 +166,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
// ------------------------------------------------------------------------
// ------------------------------------------------------------------------
- @ReduceOperator.Combinable
+ @Combinable
public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
@@ -194,7 +194,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
}
}
- @ReduceOperator.Combinable
+ @Combinable
public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
index 4c27a68..2d8838a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CrossTaskTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.Record;
import org.junit.Test;
-@SuppressWarnings("deprecation")
public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record, Record>> {
private static final long CROSS_MEM = 1024 * 1024;
@@ -65,7 +64,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
try {
testDriver(testTask, MockCrossStub.class);
@@ -95,7 +94,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
try {
testDriver(testTask, MockCrossStub.class);
@@ -123,7 +122,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
try {
testDriver(testTask, MockFailingCrossStub.class);
@@ -153,7 +152,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
try {
testDriver(testTask, MockFailingCrossStub.class);
@@ -184,7 +183,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
try {
testDriver(testTask, MockCrossStub.class);
@@ -215,7 +214,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
try {
testDriver(testTask, MockCrossStub.class);
@@ -243,7 +242,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
try {
testDriver(testTask, MockFailingCrossStub.class);
@@ -272,7 +271,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
try {
testDriver(testTask, MockFailingCrossStub.class);
@@ -303,7 +302,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
try {
testDriver(testTask, MockCrossStub.class);
@@ -333,7 +332,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
try {
testDriver(testTask, MockCrossStub.class);
@@ -363,7 +362,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
try {
testDriver(testTask, MockCrossStub.class);
@@ -393,7 +392,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
try {
testDriver(testTask, MockCrossStub.class);
@@ -420,7 +419,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
final AtomicBoolean success = new AtomicBoolean(false);
@@ -463,7 +462,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_BLOCKED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
final AtomicBoolean success = new AtomicBoolean(false);
@@ -485,7 +484,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
try {
tct.join();
- taskRunner.join();
+ taskRunner.join();
} catch(InterruptedException ie) {
Assert.fail("Joining threads failed");
}
@@ -506,7 +505,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
final AtomicBoolean success = new AtomicBoolean(false);
@@ -549,7 +548,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
getTaskConfig().setDriverStrategy(DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND);
getTaskConfig().setRelativeMemoryDriver(cross_frac);
- final CrossDriver<Record, Record, Record> testTask = new CrossDriver<Record, Record, Record>();
+ final CrossDriver<Record, Record, Record> testTask = new CrossDriver<>();
final AtomicBoolean success = new AtomicBoolean(false);
@@ -571,7 +570,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
try {
tct.join();
- taskRunner.join();
+ taskRunner.join();
} catch(InterruptedException ie) {
Assert.fail("Joining threads failed");
}
@@ -579,7 +578,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
Assert.assertTrue("Exception was thrown despite proper canceling.", success.get());
}
- public static final class MockCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
+ public static final class MockCrossStub implements CrossFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;
@Override
@@ -588,7 +587,7 @@ public class CrossTaskTest extends DriverTestBase<CrossFunction<Record, Record,
}
}
- public static final class MockFailingCrossStub extends org.apache.flink.api.java.record.functions.CrossFunction {
+ public static final class MockFailingCrossStub implements CrossFunction<Record, Record, Record> {
private static final long serialVersionUID = 1L;
private int cnt = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index e91d338..b741b64 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.operators;
+import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
import org.apache.flink.types.Record;
import org.junit.After;
import org.junit.Assert;
@@ -81,7 +80,7 @@ public class DataSinkTaskTest extends TaskTestBase
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
- DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+ DataSinkTask<Record> testTask = new DataSinkTask<>();
super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
@@ -94,7 +93,7 @@ public class DataSinkTaskTest extends TaskTestBase
fr = new FileReader(tempTestFile);
br = new BufferedReader(fr);
- HashMap<Integer, HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt);
+ HashMap<Integer, HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt);
while (br.ready()) {
String line = br.readLine();
@@ -144,7 +143,7 @@ public class DataSinkTaskTest extends TaskTestBase
readers[2] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 2, 0, false), 0, false);
readers[3] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 3, 0, false), 0, false);
- DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+ DataSinkTask<Record> testTask = new DataSinkTask<>();
super.registerFileOutputTask(testTask, MockOutputFormat.class, new File(tempTestPath).toURI().toString());
@@ -171,7 +170,7 @@ public class DataSinkTaskTest extends TaskTestBase
fr = new FileReader(tempTestFile);
br = new BufferedReader(fr);
- HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt);
+ HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt);
while(br.ready()) {
String line = br.readLine();
@@ -219,12 +218,12 @@ public class DataSinkTaskTest extends TaskTestBase
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
- DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+ DataSinkTask<Record> testTask = new DataSinkTask<>();
// set sorting
super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
super.getTaskConfig().setInputComparator(
- new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})), 0);
+ new RecordComparatorFactory(new int[]{1},(new Class[]{IntValue.class})), 0);
super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
super.getTaskConfig().setFilehandlesInput(0, 8);
super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
@@ -248,7 +247,7 @@ public class DataSinkTaskTest extends TaskTestBase
fr = new FileReader(tempTestFile);
br = new BufferedReader(fr);
- Set<Integer> keys = new HashSet<Integer>();
+ Set<Integer> keys = new HashSet<>();
int curVal = -1;
while(br.ready()) {
@@ -297,7 +296,7 @@ public class DataSinkTaskTest extends TaskTestBase
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
- DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+ DataSinkTask<Record> testTask = new DataSinkTask<>();
Configuration stubParams = new Configuration();
super.getTaskConfig().setStubParameters(stubParams);
@@ -323,21 +322,20 @@ public class DataSinkTaskTest extends TaskTestBase
public void testFailingSortingDataSinkTask() {
int keyCnt = 100;
- int valCnt = 20;;
+ int valCnt = 20;
double memoryFraction = 1.0;
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
- DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+ DataSinkTask<Record> testTask = new DataSinkTask<>();
Configuration stubParams = new Configuration();
super.getTaskConfig().setStubParameters(stubParams);
// set sorting
super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
super.getTaskConfig().setInputComparator(
- new RecordComparatorFactory(new int[]{1}, ((Class<? extends Key<?>>[]) new Class[]{IntValue.class})),
- 0);
+ new RecordComparatorFactory(new int[]{1}, ( new Class[]{IntValue.class})), 0);
super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
super.getTaskConfig().setFilehandlesInput(0, 8);
super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
@@ -365,7 +363,7 @@ public class DataSinkTaskTest extends TaskTestBase
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new InfiniteInputIterator(), 0);
- final DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+ final DataSinkTask<Record> testTask = new DataSinkTask<>();
Configuration stubParams = new Configuration();
super.getTaskConfig().setStubParameters(stubParams);
@@ -407,15 +405,14 @@ public class DataSinkTaskTest extends TaskTestBase
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addInput(new InfiniteInputIterator(), 0);
- final DataSinkTask<Record> testTask = new DataSinkTask<Record>();
+ final DataSinkTask<Record> testTask = new DataSinkTask<>();
Configuration stubParams = new Configuration();
super.getTaskConfig().setStubParameters(stubParams);
// set sorting
super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
super.getTaskConfig().setInputComparator(
- new RecordComparatorFactory(new int[]{1},((Class<? extends Key<?>>[])new Class[]{IntValue.class})),
- 0);
+ new RecordComparatorFactory(new int[]{1},(new Class[]{IntValue.class})), 0);
super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
super.getTaskConfig().setFilehandlesInput(0, 8);
super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
@@ -447,7 +444,7 @@ public class DataSinkTaskTest extends TaskTestBase
}
- public static class MockOutputFormat extends DelimitedOutputFormat {
+ public static class MockOutputFormat extends FileOutputFormat<Record> {
private static final long serialVersionUID = 1L;
final StringBuilder bld = new StringBuilder();
@@ -458,8 +455,7 @@ public class DataSinkTaskTest extends TaskTestBase
}
@Override
- public int serializeRecord(Record rec, byte[] target) throws Exception
- {
+ public void writeRecord(Record rec) throws IOException {
IntValue key = rec.getField(0, IntValue.class);
IntValue value = rec.getField(1, IntValue.class);
@@ -467,14 +463,11 @@ public class DataSinkTaskTest extends TaskTestBase
this.bld.append(key.getValue());
this.bld.append('_');
this.bld.append(value.getValue());
+ this.bld.append('\n');
byte[] bytes = this.bld.toString().getBytes();
- if (bytes.length <= target.length) {
- System.arraycopy(bytes, 0, target, 0, bytes.length);
- return bytes.length;
- }
- // else
- return -bytes.length;
+
+ this.stream.write(bytes);
}
}
@@ -490,12 +483,11 @@ public class DataSinkTaskTest extends TaskTestBase
}
@Override
- public int serializeRecord(Record rec, byte[] target) throws Exception
- {
+ public void writeRecord(Record rec) throws IOException {
if (++this.cnt >= 10) {
throw new RuntimeException("Expected Test Exception");
}
- return super.serializeRecord(rec, target);
+ super.writeRecord(rec);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
index 4548410..96ae700 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
@@ -28,11 +28,11 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import org.apache.flink.api.common.io.DelimitedInputFormat;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.taskmanager.Task;
import org.junit.Assert;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -83,7 +83,7 @@ public class DataSourceTaskTest extends TaskTestBase {
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addOutput(this.outList);
- DataSourceTask<Record> testTask = new DataSourceTask<Record>();
+ DataSourceTask<Record> testTask = new DataSourceTask<>();
super.registerFileInputTask(testTask, MockInputFormat.class, new File(tempTestPath).toURI().toString(), "\n");
@@ -97,7 +97,7 @@ public class DataSourceTaskTest extends TaskTestBase {
Assert.assertTrue("Invalid output size. Expected: "+(keyCnt*valCnt)+" Actual: "+this.outList.size(),
this.outList.size() == keyCnt * valCnt);
- HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<Integer, HashSet<Integer>>(keyCnt);
+ HashMap<Integer,HashSet<Integer>> keyValueCountMap = new HashMap<>(keyCnt);
for (Record kvp : this.outList) {
@@ -138,7 +138,7 @@ public class DataSourceTaskTest extends TaskTestBase {
super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
super.addOutput(this.outList);
- DataSourceTask<Record> testTask = new DataSourceTask<Record>();
+ DataSourceTask<Record> testTask = new DataSourceTask<>();
super.registerFileInputTask(testTask, MockFailingInputFormat.class, new File(tempTestPath).toURI().toString(), "\n");
@@ -172,7 +172,7 @@ public class DataSourceTaskTest extends TaskTestBase {
Assert.fail("Unable to set-up test input file");
}
- final DataSourceTask<Record> testTask = new DataSourceTask<Record>();
+ final DataSourceTask<Record> testTask = new DataSourceTask<>();
super.registerFileInputTask(testTask, MockDelayingInputFormat.class, new File(tempTestPath).toURI().toString(), "\n");
@@ -232,7 +232,7 @@ public class DataSourceTaskTest extends TaskTestBase {
}
}
- public static class MockInputFormat extends DelimitedInputFormat {
+ public static class MockInputFormat extends DelimitedInputFormat<Record> {
private static final long serialVersionUID = 1L;
private final IntValue key = new IntValue();
@@ -257,7 +257,7 @@ public class DataSourceTaskTest extends TaskTestBase {
}
}
- public static class MockDelayingInputFormat extends DelimitedInputFormat {
+ public static class MockDelayingInputFormat extends DelimitedInputFormat<Record> {
private static final long serialVersionUID = 1L;
private final IntValue key = new IntValue();
@@ -289,7 +289,7 @@ public class DataSourceTaskTest extends TaskTestBase {
}
- public static class MockFailingInputFormat extends DelimitedInputFormat {
+ public static class MockFailingInputFormat extends DelimitedInputFormat<Record> {
private static final long serialVersionUID = 1L;
private final IntValue key = new IntValue();
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java
new file mode 100644
index 0000000..3d7f99e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/FlatMapTaskTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.DriverTestBase;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
+import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class FlatMapTaskTest extends DriverTestBase<FlatMapFunction<Record, Record>> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FlatMapTaskTest.class);
+
+ private final CountingOutputCollector output = new CountingOutputCollector();
+
+
+ public FlatMapTaskTest(ExecutionConfig config) {
+ super(config, 0, 0);
+ }
+
+ @Test
+ public void testMapTask() {
+ final int keyCnt = 100;
+ final int valCnt = 20;
+
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+ setOutput(this.output);
+
+ final FlatMapDriver<Record, Record> testDriver = new FlatMapDriver<>();
+
+ try {
+ testDriver(testDriver, MockMapStub.class);
+ } catch (Exception e) {
+ LOG.debug("Exception while running the test driver.", e);
+ Assert.fail("Invoke method caused exception.");
+ }
+
+ Assert.assertEquals("Wrong result set size.", keyCnt*valCnt, this.output.getNumberOfRecords());
+ }
+
+ @Test
+ public void testFailingMapTask() {
+ final int keyCnt = 100;
+ final int valCnt = 20;
+
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+ setOutput(new DiscardingOutputCollector<Record>());
+
+ final FlatMapDriver<Record, Record> testTask = new FlatMapDriver<>();
+ try {
+ testDriver(testTask, MockFailingMapStub.class);
+ Assert.fail("Function exception was not forwarded.");
+ } catch (ExpectedTestException e) {
+ // good!
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Exception in test.");
+ }
+ }
+
+ @Test
+ public void testCancelMapTask() {
+ addInput(new InfiniteInputIterator());
+ setOutput(new DiscardingOutputCollector<Record>());
+
+ final FlatMapDriver<Record, Record> testTask = new FlatMapDriver<>();
+
+ final AtomicBoolean success = new AtomicBoolean(false);
+
+ final Thread taskRunner = new Thread() {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockMapStub.class);
+ success.set(true);
+ } catch (Exception ie) {
+ ie.printStackTrace();
+ }
+ }
+ };
+ taskRunner.start();
+
+ TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
+ tct.start();
+
+ try {
+ tct.join();
+ taskRunner.join();
+ } catch(InterruptedException ie) {
+ Assert.fail("Joining threads failed");
+ }
+
+ Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+ }
+
+ public static class MockMapStub extends RichFlatMapFunction<Record, Record> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(Record record, Collector<Record> out) throws Exception {
+ out.collect(record);
+ }
+
+ }
+
+ public static class MockFailingMapStub extends RichFlatMapFunction<Record, Record> {
+ private static final long serialVersionUID = 1L;
+
+ private int cnt = 0;
+
+ @Override
+ public void flatMap(Record record, Collector<Record> out) throws Exception {
+ if (++this.cnt >= 10) {
+ throw new ExpectedTestException();
+ }
+ out.collect(record);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
new file mode 100644
index 0000000..5b2e6eb
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskExternalITCase.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.junit.Assert;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.operators.testutils.DriverTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+public class JoinTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
+
+ private static final long HASH_MEM = 4*1024*1024;
+
+ private static final long SORT_MEM = 3*1024*1024;
+
+ private static final long BNLJN_MEM = 10 * PAGE_SIZE;
+
+ private final double bnljn_frac;
+
+ private final double hash_frac;
+
+ @SuppressWarnings("unchecked")
+ private final RecordComparator comparator1 = new RecordComparator(
+ new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+
+ @SuppressWarnings("unchecked")
+ private final RecordComparator comparator2 = new RecordComparator(
+ new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+
+ private final CountingOutputCollector output = new CountingOutputCollector();
+
+ public JoinTaskExternalITCase(ExecutionConfig config) {
+ super(config, HASH_MEM, 2, SORT_MEM);
+ bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
+ hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
+ }
+
+ @Test
+ public void testExternalSort1MatchTask() {
+ final int keyCnt1 = 16384*4;
+ final int valCnt1 = 2;
+
+ final int keyCnt2 = 8192;
+ final int valCnt2 = 4*2;
+
+ final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+
+ setOutput(this.output);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+ addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+
+ Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
+ }
+
+ @Test
+ public void testExternalHash1MatchTask() {
+ final int keyCnt1 = 32768;
+ final int valCnt1 = 8;
+
+ final int keyCnt2 = 65536;
+ final int valCnt2 = 8;
+
+ final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+
+ addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+ addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ setOutput(this.output);
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test caused an exception.");
+ }
+
+ Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
+ }
+
+ @Test
+ public void testExternalHash2MatchTask() {
+ final int keyCnt1 = 32768;
+ final int valCnt1 = 8;
+
+ final int keyCnt2 = 65536;
+ final int valCnt2 = 8;
+
+ final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+
+ addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+ addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ setOutput(this.output);
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test caused an exception.");
+ }
+
+ Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
+ }
+
+ public static final class MockMatchStub implements FlatJoinFunction<Record, Record, Record> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
+ out.collect(value1);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
new file mode 100644
index 0000000..ecde59e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/JoinTaskTest.java
@@ -0,0 +1,1017 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.typeutils.record.RecordComparator;
+import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
+import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
+import org.apache.flink.runtime.operators.testutils.DriverTestBase;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
+import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.Key;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class JoinTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
+
+ private static final long HASH_MEM = 6*1024*1024;
+
+ private static final long SORT_MEM = 3*1024*1024;
+
+ private static final int NUM_SORTER = 2;
+
+ private static final long BNLJN_MEM = 10 * PAGE_SIZE;
+
+ private final double bnljn_frac;
+
+ private final double hash_frac;
+
+ @SuppressWarnings("unchecked")
+ private final RecordComparator comparator1 = new RecordComparator(
+ new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
+
+ @SuppressWarnings("unchecked")
+ private final RecordComparator comparator2 = new RecordComparator(
+ new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
+
+ private final List<Record> outList = new ArrayList<>();
+
+
+ public JoinTaskTest(ExecutionConfig config) {
+ super(config, HASH_MEM, NUM_SORTER, SORT_MEM);
+ bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
+ hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
+ }
+
+
+ @Test
+ public void testSortBoth1MatchTask() {
+ final int keyCnt1 = 20;
+ final int valCnt1 = 1;
+
+ final int keyCnt2 = 10;
+ final int valCnt2 = 2;
+
+ setOutput(this.outList);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+ addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+
+ final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+ Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt);
+
+ this.outList.clear();
+ }
+
+ @Test
+ public void testSortBoth2MatchTask() {
+
+ int keyCnt1 = 20;
+ int valCnt1 = 1;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 1;
+
+ setOutput(this.outList);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+ addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+
+ int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+
+ Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+
+ this.outList.clear();
+
+ }
+
+ @Test
+ public void testSortBoth3MatchTask() {
+
+ int keyCnt1 = 20;
+ int valCnt1 = 1;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 20;
+
+ setOutput(this.outList);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+ addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+
+ int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+
+ Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+
+ this.outList.clear();
+
+ }
+
+ @Test
+ public void testSortBoth4MatchTask() {
+
+ int keyCnt1 = 20;
+ int valCnt1 = 20;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 1;
+
+ setOutput(this.outList);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+ addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+
+ int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+
+ Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+
+ this.outList.clear();
+
+ }
+
+ @Test
+ public void testSortBoth5MatchTask() {
+
+ int keyCnt1 = 20;
+ int valCnt1 = 20;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 20;
+
+ setOutput(this.outList);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+ addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+
+ int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+
+ Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+
+ this.outList.clear();
+
+ }
+
+ @Test
+ public void testSortFirstMatchTask() {
+
+ int keyCnt1 = 20;
+ int valCnt1 = 20;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 20;
+
+ setOutput(this.outList);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
+ addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+
+ int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+
+ Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+
+ this.outList.clear();
+
+ }
+
+ @Test
+ public void testSortSecondMatchTask() {
+
+ int keyCnt1 = 20;
+ int valCnt1 = 20;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 20;
+
+ setOutput(this.outList);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
+ addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+
+ int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+
+ Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+
+ this.outList.clear();
+
+ }
+
+ @Test
+ public void testMergeMatchTask() {
+ int keyCnt1 = 20;
+ int valCnt1 = 20;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 20;
+
+ setOutput(this.outList);
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
+ addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
+
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+
+ int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+
+ Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
+
+ this.outList.clear();
+
+ }
+
+ @Test
+ public void testFailingMatchTask() {
+ int keyCnt1 = 20;
+ int valCnt1 = 20;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 20;
+
+ setOutput(new NirvanaOutputList());
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
+ addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
+
+ try {
+ testDriver(testTask, MockFailingMatchStub.class);
+ Assert.fail("Driver did not forward Exception.");
+ } catch (ExpectedTestException e) {
+ // good!
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+ }
+
+ @Test
+ public void testCancelMatchTaskWhileSort1() {
+ final int keyCnt = 20;
+ final int valCnt = 20;
+
+ try {
+ setOutput(new NirvanaOutputList());
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort1()") {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ taskRunner.start();
+
+ Thread.sleep(1000);
+
+ cancel();
+ taskRunner.interrupt();
+
+ taskRunner.join(60000);
+
+ assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+ Throwable taskError = error.get();
+ if (taskError != null) {
+ taskError.printStackTrace();
+ fail("Error in task while canceling: " + taskError.getMessage());
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCancelMatchTaskWhileSort2() {
+ final int keyCnt = 20;
+ final int valCnt = 20;
+
+ try {
+ setOutput(new NirvanaOutputList());
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+ addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort2()") {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ taskRunner.start();
+
+ Thread.sleep(1000);
+
+ cancel();
+ taskRunner.interrupt();
+
+ taskRunner.join(60000);
+
+ assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+ Throwable taskError = error.get();
+ if (taskError != null) {
+ taskError.printStackTrace();
+ fail("Error in task while canceling: " + taskError.getMessage());
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testCancelMatchTaskWhileMatching() {
+ final int keyCnt = 20;
+ final int valCnt = 20;
+
+ try {
+ setOutput(new NirvanaOutputList());
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileMatching()") {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockDelayingMatchStub.class);
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
+ }
+ };
+ taskRunner.start();
+
+ Thread.sleep(1000);
+
+ cancel();
+ taskRunner.interrupt();
+
+ taskRunner.join(60000);
+
+ assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+ Throwable taskError = error.get();
+ if (taskError != null) {
+ taskError.printStackTrace();
+ fail("Error in task while canceling: " + taskError.getMessage());
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testHash1MatchTask() {
+ int keyCnt1 = 20;
+ int valCnt1 = 1;
+
+ int keyCnt2 = 10;
+ int valCnt2 = 2;
+
+ addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+ addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ setOutput(this.outList);
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test caused an exception.");
+ }
+
+ final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+ Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+ this.outList.clear();
+ }
+
+ @Test
+ public void testHash2MatchTask() {
+ int keyCnt1 = 20;
+ int valCnt1 = 1;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 1;
+
+ addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+ addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ setOutput(this.outList);
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test caused an exception.");
+ }
+
+ final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+ Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+ this.outList.clear();
+ }
+
+ @Test
+ public void testHash3MatchTask() {
+ int keyCnt1 = 20;
+ int valCnt1 = 1;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 20;
+
+ addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+ addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ setOutput(this.outList);
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test caused an exception.");
+ }
+
+ final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+ Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+ this.outList.clear();
+ }
+
+ @Test
+ public void testHash4MatchTask() {
+ int keyCnt1 = 20;
+ int valCnt1 = 20;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 1;
+
+ addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+ addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ setOutput(this.outList);
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test caused an exception.");
+ }
+
+ final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+ Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+ this.outList.clear();
+ }
+
+ @Test
+ public void testHash5MatchTask() {
+ int keyCnt1 = 20;
+ int valCnt1 = 20;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 20;
+
+ addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+ addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ setOutput(this.outList);
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test caused an exception.");
+ }
+
+ final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
+ Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
+ this.outList.clear();
+ }
+
+ @Test
+ public void testFailingHashFirstMatchTask() {
+ int keyCnt1 = 20;
+ int valCnt1 = 20;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 20;
+
+ addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+ addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ setOutput(new NirvanaOutputList());
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ testDriver(testTask, MockFailingMatchStub.class);
+ Assert.fail("Function exception was not forwarded.");
+ } catch (ExpectedTestException etex) {
+ // good!
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test caused an exception.");
+ }
+ }
+
+ @Test
+ public void testFailingHashSecondMatchTask() {
+ int keyCnt1 = 20;
+ int valCnt1 = 20;
+
+ int keyCnt2 = 20;
+ int valCnt2 = 20;
+
+ addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
+ addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ setOutput(new NirvanaOutputList());
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ try {
+ testDriver(testTask, MockFailingMatchStub.class);
+ Assert.fail("Function exception was not forwarded.");
+ } catch (ExpectedTestException etex) {
+ // good!
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("Test caused an exception.");
+ }
+ }
+
+ @Test
+ public void testCancelHashMatchTaskWhileBuildFirst() {
+ final int keyCnt = 20;
+ final int valCnt = 20;
+
+ try {
+ addInput(new DelayingInfinitiveInputIterator(100));
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+
+ setOutput(new NirvanaOutputList());
+
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ final AtomicBoolean success = new AtomicBoolean(false);
+
+ Thread taskRunner = new Thread() {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ success.set(true);
+ } catch (Exception ie) {
+ ie.printStackTrace();
+ }
+ }
+ };
+ taskRunner.start();
+
+ Thread.sleep(1000);
+ cancel();
+
+ try {
+ taskRunner.join();
+ }
+ catch (InterruptedException ie) {
+ Assert.fail("Joining threads failed");
+ }
+
+ Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testHashCancelMatchTaskWhileBuildSecond() {
+ final int keyCnt = 20;
+ final int valCnt = 20;
+
+ try {
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+ addInput(new DelayingInfinitiveInputIterator(100));
+
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+
+ setOutput(new NirvanaOutputList());
+
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ final AtomicBoolean success = new AtomicBoolean(false);
+
+ Thread taskRunner = new Thread() {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ success.set(true);
+ } catch (Exception ie) {
+ ie.printStackTrace();
+ }
+ }
+ };
+ taskRunner.start();
+
+ Thread.sleep(1000);
+ cancel();
+
+ try {
+ taskRunner.join();
+ }
+ catch (InterruptedException ie) {
+ Assert.fail("Joining threads failed");
+ }
+
+ Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testHashFirstCancelMatchTaskWhileMatching() {
+ int keyCnt = 20;
+ int valCnt = 20;
+
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ setOutput(new NirvanaOutputList());
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ final AtomicBoolean success = new AtomicBoolean(false);
+
+ Thread taskRunner = new Thread() {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ success.set(true);
+ } catch (Exception ie) {
+ ie.printStackTrace();
+ }
+ }
+ };
+ taskRunner.start();
+
+ TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
+ tct.start();
+
+ try {
+ tct.join();
+ taskRunner.join();
+ } catch(InterruptedException ie) {
+ Assert.fail("Joining threads failed");
+ }
+
+ Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+ }
+
+ @Test
+ public void testHashSecondCancelMatchTaskWhileMatching() {
+ int keyCnt = 20;
+ int valCnt = 20;
+
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ setOutput(new NirvanaOutputList());
+ getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
+ getTaskConfig().setRelativeMemoryDriver(hash_frac);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<>();
+
+ final AtomicBoolean success = new AtomicBoolean(false);
+
+ Thread taskRunner = new Thread() {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ success.set(true);
+ } catch (Exception ie) {
+ ie.printStackTrace();
+ }
+ }
+ };
+ taskRunner.start();
+
+ TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
+ tct.start();
+
+ try {
+ tct.join();
+ taskRunner.join();
+ } catch(InterruptedException ie) {
+ Assert.fail("Joining threads failed");
+ }
+
+ Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+ }
+
+ // =================================================================================================
+
+ public static final class MockMatchStub implements FlatJoinFunction<Record, Record, Record> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
+ out.collect(record1);
+ }
+ }
+
+ public static final class MockFailingMatchStub implements FlatJoinFunction<Record, Record, Record> {
+ private static final long serialVersionUID = 1L;
+
+ private int cnt = 0;
+
+ @Override
+ public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
+ if (++this.cnt >= 10) {
+ throw new ExpectedTestException();
+ }
+ out.collect(record1);
+ }
+ }
+
+ public static final class MockDelayingMatchStub implements FlatJoinFunction<Record, Record, Record> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
deleted file mode 100644
index bfc6c44..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MapTaskTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
-import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Assert;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class MapTaskTest extends DriverTestBase<GenericCollectorMap<Record, Record>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(MapTaskTest.class);
-
- private final CountingOutputCollector output = new CountingOutputCollector();
-
-
- public MapTaskTest(ExecutionConfig config) {
- super(config, 0, 0);
- }
-
- @Test
- public void testMapTask() {
- final int keyCnt = 100;
- final int valCnt = 20;
-
- addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- setOutput(this.output);
-
- final CollectorMapDriver<Record, Record> testDriver = new CollectorMapDriver<Record, Record>();
-
- try {
- testDriver(testDriver, MockMapStub.class);
- } catch (Exception e) {
- LOG.debug("Exception while running the test driver.", e);
- Assert.fail("Invoke method caused exception.");
- }
-
- Assert.assertEquals("Wrong result set size.", keyCnt*valCnt, this.output.getNumberOfRecords());
- }
-
- @Test
- public void testFailingMapTask() {
- final int keyCnt = 100;
- final int valCnt = 20;
-
- addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- setOutput(new DiscardingOutputCollector<Record>());
-
- final CollectorMapDriver<Record, Record> testTask = new CollectorMapDriver<Record, Record>();
- try {
- testDriver(testTask, MockFailingMapStub.class);
- Assert.fail("Function exception was not forwarded.");
- } catch (ExpectedTestException e) {
- // good!
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Exception in test.");
- }
- }
-
- @Test
- public void testCancelMapTask() {
- addInput(new InfiniteInputIterator());
- setOutput(new DiscardingOutputCollector<Record>());
-
- final CollectorMapDriver<Record, Record> testTask = new CollectorMapDriver<Record, Record>();
-
- final AtomicBoolean success = new AtomicBoolean(false);
-
- final Thread taskRunner = new Thread() {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockMapStub.class);
- success.set(true);
- } catch (Exception ie) {
- ie.printStackTrace();
- }
- }
- };
- taskRunner.start();
-
- TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
- tct.start();
-
- try {
- tct.join();
- taskRunner.join();
- } catch(InterruptedException ie) {
- Assert.fail("Joining threads failed");
- }
-
- Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
- }
-
- public static class MockMapStub extends MapFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- out.collect(record);
- }
-
- }
-
- public static class MockFailingMapStub extends MapFunction {
- private static final long serialVersionUID = 1L;
-
- private int cnt = 0;
-
- @Override
- public void map(Record record, Collector<Record> out) throws Exception {
- if (++this.cnt >= 10) {
- throw new ExpectedTestException();
- }
- out.collect(record);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
deleted file mode 100644
index 6f7fb21..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskExternalITCase.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.junit.Assert;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-@SuppressWarnings("deprecation")
-public class MatchTaskExternalITCase extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
-
- private static final long HASH_MEM = 4*1024*1024;
-
- private static final long SORT_MEM = 3*1024*1024;
-
- private static final long BNLJN_MEM = 10 * PAGE_SIZE;
-
- private final double bnljn_frac;
-
- private final double hash_frac;
-
- @SuppressWarnings("unchecked")
- private final RecordComparator comparator1 = new RecordComparator(
- new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
-
- @SuppressWarnings("unchecked")
- private final RecordComparator comparator2 = new RecordComparator(
- new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
-
- private final CountingOutputCollector output = new CountingOutputCollector();
-
- public MatchTaskExternalITCase(ExecutionConfig config) {
- super(config, HASH_MEM, 2, SORT_MEM);
- bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
- hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
- }
-
- @Test
- public void testExternalSort1MatchTask() {
- final int keyCnt1 = 16384*4;
- final int valCnt1 = 2;
-
- final int keyCnt2 = 8192;
- final int valCnt2 = 4*2;
-
- final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-
- setOutput(this.output);
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
- addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
-
- Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
- }
-
- @Test
- public void testExternalHash1MatchTask() {
- final int keyCnt1 = 32768;
- final int valCnt1 = 8;
-
- final int keyCnt2 = 65536;
- final int valCnt2 = 8;
-
- final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-
- addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
- addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- setOutput(this.output);
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Test caused an exception.");
- }
-
- Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
- }
-
- @Test
- public void testExternalHash2MatchTask() {
- final int keyCnt1 = 32768;
- final int valCnt1 = 8;
-
- final int keyCnt2 = 65536;
- final int valCnt2 = 8;
-
- final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-
- addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
- addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- setOutput(this.output);
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Test caused an exception.");
- }
-
- Assert.assertEquals("Wrong result set size.", expCnt, this.output.getNumberOfRecords());
- }
-
- public static final class MockMatchStub extends JoinFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
- out.collect(value1);
- }
- }
-}