You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/13 17:11:57 UTC
[2/3] flink git commit: [FLINK-1085] [tests] Make the combiner tests
generic. Add more coverage for oversized records.
[FLINK-1085] [tests] Make the combiner tests generic. Add more coverage for oversized records.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/01c74338
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/01c74338
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/01c74338
Branch: refs/heads/master
Commit: 01c74338ff44ea7f3735a7eb94b2ce01ababc505
Parents: 7271881
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jul 13 15:12:04 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jul 13 16:29:38 2015 +0200
----------------------------------------------------------------------
.../operators/GroupReduceCombineDriver.java | 60 +--
.../operators/CombineTaskExternalITCase.java | 87 +++-
.../runtime/operators/CombineTaskTest.java | 354 ++++++++--------
.../operators/CombinerOversizedRecordsTest.java | 236 +++++++++++
.../operators/testutils/DelayingIterator.java | 59 +++
.../testutils/InfiniteIntTupleIterator.java | 38 ++
.../runtime/operators/testutils/TestData.java | 2 +-
.../testutils/UnaryOperatorTestBase.java | 410 +++++++++++++++++++
.../UniformIntStringTupleGenerator.java | 77 ++++
.../testutils/UniformIntTupleGenerator.java | 75 ++++
.../operators/testutils/UnionIterator.java | 16 +-
11 files changed, 1199 insertions(+), 215 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
index c426295..2bf778e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/GroupReduceCombineDriver.java
@@ -16,13 +16,9 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -33,10 +29,15 @@ import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
+import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
import java.util.List;
/**
@@ -44,11 +45,12 @@ import java.util.List;
* the user supplied a RichGroupReduceFunction with a combine method. The combining is performed in memory with a
* lazy approach which only combines elements which currently fit in the sorter. This may lead to a partial solution.
* In the case of the RichGroupReduceFunction this partial result is transformed into a proper deterministic result.
- * The CombineGroup uses the GroupCombineFunction interface which allows to combine values of type <IN> to any type
- * of type <OUT>. In contrast, the RichGroupReduceFunction requires the combine method to have the same input and
- * output type to be able to reduce the elements after the combine from <IN> to <OUT>.
+ * The CombineGroup uses the GroupCombineFunction interface which allows to combine values of type {@code IN}
+ * to any type of type {@code OUT}. In contrast, the RichGroupReduceFunction requires the combine method
+ * to have the same input and output type to be able to reduce the elements after the combine from
+ * {@code IN} to {@code OUT}.
*
- * The CombineTask uses a combining iterator over its input. The output of the iterator is emitted.
+ * <p>The CombineTask uses a combining iterator over its input. The output of the iterator is emitted.</p>
*
* @param <IN> The data type consumed by the combiner.
* @param <OUT> The data type produced by the combiner.
@@ -67,8 +69,6 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
private GroupCombineFunction<IN, OUT> combiner;
private TypeSerializer<IN> serializer;
-
- private TypeComparator<IN> sortingComparator;
private TypeComparator<IN> groupingComparator;
@@ -78,7 +78,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
private Collector<OUT> output;
- private long oversizedRecordCount = 0L;
+ private long oversizedRecordCount;
private volatile boolean running = true;
@@ -112,9 +112,8 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
@Override
public void prepare() throws Exception {
final DriverStrategy driverStrategy = this.taskContext.getTaskConfig().getDriverStrategy();
- if(driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){
- throw new Exception("Invalid strategy " + driverStrategy + " for " +
- "group reduce combinder.");
+ if (driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE){
+ throw new Exception("Invalid strategy " + driverStrategy + " for group reduce combiner.");
}
this.memManager = this.taskContext.getMemoryManager();
@@ -122,7 +121,9 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
final TypeSerializerFactory<IN> serializerFactory = this.taskContext.getInputSerializer(0);
this.serializer = serializerFactory.getSerializer();
- this.sortingComparator = this.taskContext.getDriverComparator(0);
+
+ final TypeComparator<IN> sortingComparator = this.taskContext.getDriverComparator(0);
+
this.groupingComparator = this.taskContext.getDriverComparator(1);
this.combiner = this.taskContext.getStub();
this.output = this.taskContext.getOutputCollector();
@@ -131,12 +132,12 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
numMemoryPages);
// instantiate a fix-length in-place sorter, if possible, otherwise the out-of-place sorter
- if (this.sortingComparator.supportsSerializationWithKeyNormalization() &&
+ if (sortingComparator.supportsSerializationWithKeyNormalization() &&
this.serializer.getLength() > 0 && this.serializer.getLength() <= THRESHOLD_FOR_IN_PLACE_SORTING)
{
- this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, this.sortingComparator, memory);
+ this.sorter = new FixedLengthRecordSorter<IN>(this.serializer, sortingComparator, memory);
} else {
- this.sorter = new NormalizedKeySorter<IN>(this.serializer, this.sortingComparator.duplicate(), memory);
+ this.sorter = new NormalizedKeySorter<IN>(this.serializer, sortingComparator.duplicate(), memory);
}
ExecutionConfig executionConfig = taskContext.getExecutionConfig();
@@ -171,10 +172,14 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
// write the value again
if (!this.sorter.write(value)) {
+
++oversizedRecordCount;
- LOG.debug("Cannot write record to fresh sort buffer. Record too large. Oversized record count: {}", oversizedRecordCount);
- // simply forward the record
- this.output.collect((OUT)value);
+ LOG.debug("Cannot write record to fresh sort buffer, record is too large. " +
+ "Oversized record count: {}", oversizedRecordCount);
+
+ // simply forward the record. We need to pass it through the combine function to convert it
+ Iterable<IN> input = Collections.singleton(value);
+ this.combiner.combine(input, this.output);
}
}
@@ -210,7 +215,7 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
@Override
public void cleanup() throws Exception {
- if(this.sorter != null) {
+ if (this.sorter != null) {
this.memManager.release(this.sorter.dispose());
}
}
@@ -218,8 +223,17 @@ public class GroupReduceCombineDriver<IN, OUT> implements PactDriver<GroupCombin
@Override
public void cancel() {
this.running = false;
- if(this.sorter != null) {
+ if (this.sorter != null) {
this.memManager.release(this.sorter.dispose());
}
}
+
+ /**
+ * Gets the number of oversized records handled by this combiner.
+ *
+ * @return The number of oversized records handled by this combiner.
+ */
+ public long getOversizedRecordCount() {
+ return oversizedRecordCount;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
index d957fa1..4905e57 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskExternalITCase.java
@@ -16,17 +16,18 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
import java.util.ArrayList;
import java.util.HashMap;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.runtime.operators.CombineTaskTest.MockCombiningReduceStub;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.IntValue;
@@ -45,7 +46,7 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
@SuppressWarnings("unchecked")
private final RecordComparator comparator = new RecordComparator(
- new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+ new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
public CombineTaskExternalITCase(ExecutionConfig config) {
super(config, COMBINE_MEM, 0);
@@ -161,4 +162,84 @@ public class CombineTaskExternalITCase extends DriverTestBase<RichGroupReduceFun
this.outList.clear();
}
+
+ // ------------------------------------------------------------------------
+ // ------------------------------------------------------------------------
+
+ @ReduceOperator.Combinable
+ public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
+ private static final long serialVersionUID = 1L;
+
+ private final IntValue theInteger = new IntValue();
+
+ @Override
+ public void reduce(Iterable<Record> records, Collector<Record> out) {
+ Record element = null;
+ int sum = 0;
+
+ for (Record next : records) {
+ element = next;
+ element.getField(1, this.theInteger);
+
+ sum += this.theInteger.getValue();
+ }
+ this.theInteger.setValue(sum);
+ element.setField(1, this.theInteger);
+ out.collect(element);
+ }
+
+ @Override
+ public void combine(Iterable<Record> records, Collector<Record> out) throws Exception {
+ reduce(records, out);
+ }
+ }
+
+ @ReduceOperator.Combinable
+ public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
+ private static final long serialVersionUID = 1L;
+
+ private int cnt = 0;
+
+ private final IntValue key = new IntValue();
+ private final IntValue value = new IntValue();
+ private final IntValue combineValue = new IntValue();
+
+ @Override
+ public void reduce(Iterable<Record> records, Collector<Record> out) {
+ Record element = null;
+ int sum = 0;
+
+ for (Record next : records) {
+ element = next;
+ element.getField(1, this.value);
+
+ sum += this.value.getValue();
+ }
+ element.getField(0, this.key);
+ this.value.setValue(sum - this.key.getValue());
+ element.setField(1, this.value);
+ out.collect(element);
+ }
+
+ @Override
+ public void combine(Iterable<Record> records, Collector<Record> out) {
+ Record element = null;
+ int sum = 0;
+
+ for (Record next : records) {
+ element = next;
+ element.getField(1, this.combineValue);
+
+ sum += this.combineValue.getValue();
+ }
+
+ if (++this.cnt >= 10) {
+ throw new ExpectedTestException();
+ }
+
+ this.combineValue.setValue(sum);
+ element.setField(1, this.combineValue);
+ out.collect(element);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
index 7772151..932e746 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombineTaskTest.java
@@ -18,254 +18,244 @@
package org.apache.flink.runtime.operators;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.runtime.operators.testutils.*;
-import org.junit.Assert;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.operators.testutils.DelayingIterator;
+import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
+import org.apache.flink.runtime.operators.testutils.InfiniteIntTupleIterator;
+import org.apache.flink.runtime.operators.testutils.UnaryOperatorTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
+
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
-import org.apache.flink.runtime.operators.testutils.TestData.Generator;
+
import org.junit.Test;
-public class CombineTaskTest extends DriverTestBase<RichGroupReduceFunction<Record, ?>>
-{
+import java.util.ArrayList;
+
+import static org.junit.Assert.*;
+
+public class CombineTaskTest
+ extends UnaryOperatorTestBase<RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>,
+ Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
+
private static final long COMBINE_MEM = 3 * 1024 * 1024;
private final double combine_frac;
- private final ArrayList<Record> outList = new ArrayList<Record>();
+ private final ArrayList<Tuple2<Integer, Integer>> outList = new ArrayList<Tuple2<Integer, Integer>>();
+
+ private final TypeSerializer<Tuple2<Integer, Integer>> serializer = new TupleSerializer<Tuple2<Integer, Integer>>(
+ (Class<Tuple2<Integer, Integer>>) (Class<?>) Tuple2.class,
+ new TypeSerializer<?>[] { IntSerializer.INSTANCE, IntSerializer.INSTANCE });
- @SuppressWarnings("unchecked")
- private final RecordComparator comparator = new RecordComparator(
- new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+ private final TypeComparator<Tuple2<Integer, Integer>> comparator = new TupleComparator<Tuple2<Integer, Integer>>(
+ new int[]{0},
+ new TypeComparator<?>[] { new IntComparator(true) },
+ new TypeSerializer<?>[] { IntSerializer.INSTANCE });
+
public CombineTaskTest(ExecutionConfig config) {
super(config, COMBINE_MEM, 0);
- combine_frac = (double)COMBINE_MEM/this.getMemoryManager().getMemorySize();
+ combine_frac = (double)COMBINE_MEM / this.getMemoryManager().getMemorySize();
}
+
@Test
public void testCombineTask() {
- int keyCnt = 100;
- int valCnt = 20;
-
- addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addDriverComparator(this.comparator);
- addDriverComparator(this.comparator);
- setOutput(this.outList);
-
- getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
- getTaskConfig().setRelativeMemoryDriver(combine_frac);
- getTaskConfig().setFilehandlesDriver(2);
-
- final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
-
try {
+ int keyCnt = 100;
+ int valCnt = 20;
+
+ setInput(new UniformIntTupleGenerator(keyCnt, valCnt, false), serializer);
+ addDriverComparator(this.comparator);
+ addDriverComparator(this.comparator);
+ setOutput(this.outList, serializer);
+
+ getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+ getTaskConfig().setRelativeMemoryDriver(combine_frac);
+ getTaskConfig().setFilehandlesDriver(2);
+
+ final GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask =
+ new GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>();
+
testDriver(testTask, MockCombiningReduceStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Invoke method caused exception.");
- }
-
- int expSum = 0;
- for (int i = 1;i < valCnt; i++) {
- expSum += i;
- }
-
- Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+keyCnt, this.outList.size() == keyCnt);
-
- for(Record record : this.outList) {
- Assert.assertTrue("Incorrect result", record.getField(1, IntValue.class).getValue() == expSum);
+
+ int expSum = 0;
+ for (int i = 1;i < valCnt; i++) {
+ expSum += i;
+ }
+
+ assertTrue(this.outList.size() == keyCnt);
+
+ for (Tuple2<Integer, Integer> record : this.outList) {
+ assertTrue(record.f1 == expSum);
+ }
+
+ this.outList.clear();
}
-
- this.outList.clear();
- }
-
- @Test
- public void testOversizedRecordCombineTask() {
- int tenMil = 10000000;
- Generator g = new Generator(561349061987311L, 1, tenMil);
- //generate 10 records each of size 10MB
- final TestData.GeneratorIterator gi = new TestData.GeneratorIterator(g, 10);
- List<MutableObjectIterator<Record>> inputs = new ArrayList<MutableObjectIterator<Record>>();
- inputs.add(gi);
-
- addInput(new UnionIterator<Record>(inputs));
- addDriverComparator(this.comparator);
- addDriverComparator(this.comparator);
- setOutput(this.outList);
-
- getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
- getTaskConfig().setRelativeMemoryDriver(combine_frac);
- getTaskConfig().setFilehandlesDriver(2);
-
- final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
-
- try {
- testDriver(testTask, MockCombiningReduceStub.class);
- } catch (Exception e) {
+ catch (Exception e) {
e.printStackTrace();
- Assert.fail("Invoke method caused exception.");
+ fail(e.getMessage());
}
-
- Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+10, this.outList.size() == 10);
-
- this.outList.clear();
}
@Test
public void testFailingCombineTask() {
- int keyCnt = 100;
- int valCnt = 20;
-
- addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addDriverComparator(this.comparator);
- addDriverComparator(this.comparator);
- setOutput(new DiscardingOutputCollector<Record>());
-
- getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
- getTaskConfig().setRelativeMemoryDriver(combine_frac);
- getTaskConfig().setFilehandlesDriver(2);
-
- final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
-
try {
- testDriver(testTask, MockFailingCombiningReduceStub.class);
- Assert.fail("Exception not forwarded.");
- } catch (ExpectedTestException etex) {
- // good!
- } catch (Exception e) {
+ int keyCnt = 100;
+ int valCnt = 20;
+
+ setInput(new UniformIntTupleGenerator(keyCnt, valCnt, false), serializer);
+ addDriverComparator(this.comparator);
+ addDriverComparator(this.comparator);
+ setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
+
+ getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+ getTaskConfig().setRelativeMemoryDriver(combine_frac);
+ getTaskConfig().setFilehandlesDriver(2);
+
+ final GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask =
+ new GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>();
+
+ try {
+ testDriver(testTask, MockFailingCombiningReduceStub.class);
+ fail("Exception not forwarded.");
+ }
+ catch (ExpectedTestException etex) {
+ // good!
+ }
+ }
+ catch (Exception e) {
e.printStackTrace();
- Assert.fail("Test failed due to an exception.");
+ fail(e.getMessage());
}
}
@Test
- public void testCancelCombineTaskSorting()
- {
- addInput(new DelayingInfinitiveInputIterator(100));
- addDriverComparator(this.comparator);
- addDriverComparator(this.comparator);
- setOutput(new DiscardingOutputCollector<Record>());
-
- getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
- getTaskConfig().setRelativeMemoryDriver(combine_frac);
- getTaskConfig().setFilehandlesDriver(2);
-
- final GroupReduceCombineDriver<Record, Record> testTask = new GroupReduceCombineDriver<Record, Record>();
-
- final AtomicBoolean success = new AtomicBoolean(false);
-
- Thread taskRunner = new Thread() {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockFailingCombiningReduceStub.class);
- success.set(true);
- } catch (Exception ie) {
- ie.printStackTrace();
- }
- }
- };
- taskRunner.start();
-
- TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
- tct.start();
-
+ public void testCancelCombineTaskSorting() {
try {
- tct.join();
- taskRunner.join();
- } catch(InterruptedException ie) {
- Assert.fail("Joining threads failed");
+ MutableObjectIterator<Tuple2<Integer, Integer>> slowInfiniteInput =
+ new DelayingIterator<Tuple2<Integer, Integer>>(new InfiniteIntTupleIterator(), 1);
+
+ setInput(slowInfiniteInput, serializer);
+ addDriverComparator(this.comparator);
+ addDriverComparator(this.comparator);
+ setOutput(new DiscardingOutputCollector<Tuple2<Integer, Integer>>());
+
+ getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+ getTaskConfig().setRelativeMemoryDriver(combine_frac);
+ getTaskConfig().setFilehandlesDriver(2);
+
+ final GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> testTask =
+ new GroupReduceCombineDriver<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>();
+
+ Thread taskRunner = new Thread() {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockFailingCombiningReduceStub.class);
+ }
+ catch (Exception e) {
+ // exceptions may happen during canceling
+ }
+ }
+ };
+ taskRunner.start();
+
+ // give the task some time
+ Thread.sleep(500);
+
+ // cancel
+ testTask.cancel();
+
+ // make sure it reacts to the canceling in some time
+ taskRunner.join(5000);
+
+ assertFalse("Task did not cancel properly within in 5 seconds.", taskRunner.isAlive());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
}
-
- Assert.assertTrue("Exception was thrown despite proper canceling.", success.get());
}
- @Combinable
- public static class MockCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
+ // ------------------------------------------------------------------------
+ // Test Combiners
+ // ------------------------------------------------------------------------
+
+ @RichGroupReduceFunction.Combinable
+ public static class MockCombiningReduceStub extends
+ RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>
+ {
private static final long serialVersionUID = 1L;
-
- private final IntValue theInteger = new IntValue();
@Override
- public void reduce(Iterable<Record> records, Collector<Record> out) {
- Record element = null;
+ public void reduce(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) {
+ int key = 0;
int sum = 0;
-
- for (Record next : records) {
- element = next;
- element.getField(1, this.theInteger);
-
- sum += this.theInteger.getValue();
+
+ for (Tuple2<Integer, Integer> next : records) {
+ key = next.f0;
+ sum += next.f1;
}
- this.theInteger.setValue(sum);
- element.setField(1, this.theInteger);
- out.collect(element);
+
+ out.collect(new Tuple2<Integer, Integer>(key, sum));
}
@Override
- public void combine(Iterable<Record> records, Collector<Record> out) throws Exception {
+ public void combine(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) {
reduce(records, out);
}
}
- @Combinable
- public static final class MockFailingCombiningReduceStub extends RichGroupReduceFunction<Record, Record> {
+ @RichGroupReduceFunction.Combinable
+ public static final class MockFailingCombiningReduceStub extends
+ RichGroupReduceFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>
+ {
private static final long serialVersionUID = 1L;
- private int cnt = 0;
-
- private final IntValue key = new IntValue();
- private final IntValue value = new IntValue();
- private final IntValue combineValue = new IntValue();
+ private int cnt;
@Override
- public void reduce(Iterable<Record> records, Collector<Record> out) {
- Record element = null;
+ public void reduce(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) {
+ int key = 0;
int sum = 0;
- for (Record next : records) {
- element = next;
- element.getField(1, this.value);
-
- sum += this.value.getValue();
+ for (Tuple2<Integer, Integer> next : records) {
+ key = next.f0;
+ sum += next.f1;
}
- element.getField(0, this.key);
- this.value.setValue(sum - this.key.getValue());
- element.setField(1, this.value);
- out.collect(element);
+
+ int resultValue = sum - key;
+ out.collect(new Tuple2<Integer, Integer>(key, resultValue));
}
@Override
- public void combine(Iterable<Record> records, Collector<Record> out) {
- Record element = null;
+ public void combine(Iterable<Tuple2<Integer, Integer>> records, Collector<Tuple2<Integer, Integer>> out) {
+ int key = 0;
int sum = 0;
-
- for (Record next : records) {
- element = next;
- element.getField(1, this.combineValue);
-
- sum += this.combineValue.getValue();
+
+ for (Tuple2<Integer, Integer> next : records) {
+ key = next.f0;
+ sum += next.f1;
}
if (++this.cnt >= 10) {
throw new ExpectedTestException();
}
-
- this.combineValue.setValue(sum);
- element.setField(1, this.combineValue);
- out.collect(element);
+
+ int resultValue = sum - key;
+ out.collect(new Tuple2<Integer, Integer>(key, resultValue));
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java
new file mode 100644
index 0000000..58d1676
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/CombinerOversizedRecordsTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.GroupCombineFunction;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.IntComparator;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.runtime.operators.testutils.UnaryOperatorTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator;
+import org.apache.flink.runtime.operators.testutils.UnionIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test that checks how the combiner handles very large records that are too large to be written into
+ * a fresh sort buffer.
+ */
+public class CombinerOversizedRecordsTest
+ extends UnaryOperatorTestBase<GroupCombineFunction<Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>>,
+ Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>> {
+
+ private static final long COMBINE_MEM = 3 * 1024 * 1024;
+
+ private final double combine_frac;
+
+ private final ArrayList<Tuple3<Integer, Double, String>> outList = new ArrayList<Tuple3<Integer, Double, String>>();
+
+ private final TypeSerializer<Tuple3<Integer, Integer, String>> serializer =
+ new TupleSerializer<Tuple3<Integer, Integer, String>>(
+ (Class<Tuple3<Integer, Integer, String>>) (Class<?>) Tuple3.class,
+ new TypeSerializer<?>[] { IntSerializer.INSTANCE, IntSerializer.INSTANCE, StringSerializer.INSTANCE });
+
+ private final TypeSerializer<Tuple3<Integer, Double, String>> outSerializer =
+ new TupleSerializer<Tuple3<Integer, Double, String>>(
+ (Class<Tuple3<Integer, Double, String>>) (Class<?>) Tuple3.class,
+ new TypeSerializer<?>[] { IntSerializer.INSTANCE, DoubleSerializer.INSTANCE, StringSerializer.INSTANCE });
+
+ private final TypeComparator<Tuple3<Integer, Integer, String>> comparator =
+ new TupleComparator<Tuple3<Integer, Integer, String>>(
+ new int[] { 0 },
+ new TypeComparator<?>[] { new IntComparator(true) },
+ new TypeSerializer<?>[] { IntSerializer.INSTANCE });
+
+ // ------------------------------------------------------------------------
+
+ public CombinerOversizedRecordsTest(ExecutionConfig config) {
+ super(config, COMBINE_MEM, 0);
+ combine_frac = (double)COMBINE_MEM / getMemoryManager().getMemorySize();
+ }
+
+ @Test
+ public void testOversizedRecordCombineTask() {
+ try {
+ final int keyCnt = 100;
+ final int valCnt = 20;
+
+ // create a long heavy string payload
+ StringBuilder bld = new StringBuilder(10 * 1024 * 1024);
+ Random rnd = new Random();
+
+ for (int i = 0; i < 10000000; i++) {
+ bld.append((char) (rnd.nextInt(26) + 'a'));
+ }
+
+ String longString = bld.toString();
+ bld = null;
+
+ // construct the input as a union of
+ // 1) long string
+ // 2) some random values
+ // 3) long string
+ // 4) random values
+ // 5) long string
+
+ // random values 1
+ MutableObjectIterator<Tuple2<Integer, Integer>> gen1 =
+ new UniformIntTupleGenerator(keyCnt, valCnt, false);
+
+ // random values 2
+ MutableObjectIterator<Tuple2<Integer, Integer>> gen2 =
+ new UniformIntTupleGenerator(keyCnt, valCnt, false);
+
+ @SuppressWarnings("unchecked")
+ MutableObjectIterator<Tuple3<Integer, Integer, String>> input =
+ new UnionIterator<Tuple3<Integer, Integer, String>>(
+ new SingleValueIterator<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(-1, -1, longString)),
+ new StringIteratorDecorator(gen1),
+ new SingleValueIterator<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(-1, -1, longString)),
+ new StringIteratorDecorator(gen2),
+ new SingleValueIterator<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(-1, -1, longString)));
+
+ setInput(input, serializer);
+ addDriverComparator(this.comparator);
+ addDriverComparator(this.comparator);
+ setOutput(this.outList, this.outSerializer);
+
+ getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
+ getTaskConfig().setRelativeMemoryDriver(combine_frac);
+ getTaskConfig().setFilehandlesDriver(2);
+
+ GroupReduceCombineDriver<Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>> testTask =
+ new GroupReduceCombineDriver<Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>>();
+
+ testDriver(testTask, TestCombiner.class);
+
+ assertEquals(3, testTask.getOversizedRecordCount());
+ assertTrue(keyCnt + 3 == outList.size() || 2*keyCnt + 3 == outList.size());
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ public static final class TestCombiner
+ implements GroupCombineFunction<Tuple3<Integer, Integer, String>, Tuple3<Integer, Double, String>>
+ {
+ private static final long serialVersionUID = 1L;
+
+
+ @Override
+ public void combine(Iterable<Tuple3<Integer, Integer, String>> values,
+ Collector<Tuple3<Integer, Double, String>> out)
+ {
+ int key = 0;
+ int sum = 0;
+ String someString = null;
+
+ for (Tuple3<Integer, Integer, String> next : values) {
+ key = next.f0;
+ sum += next.f1;
+ someString = next.f2;
+ }
+
+ out.collect(new Tuple3<Integer, Double, String>(key, (double) sum, someString));
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class StringIteratorDecorator implements MutableObjectIterator<Tuple3<Integer, Integer, String>> {
+
+ private final MutableObjectIterator<Tuple2<Integer, Integer>> input;
+
+ private StringIteratorDecorator(MutableObjectIterator<Tuple2<Integer, Integer>> input) {
+ this.input = input;
+ }
+
+ @Override
+ public Tuple3<Integer, Integer, String> next(Tuple3<Integer, Integer, String> reuse) throws IOException {
+ Tuple2<Integer, Integer> next = input.next();
+ if (next == null) {
+ return null;
+ }
+ else {
+ reuse.f0 = next.f0;
+ reuse.f1 = next.f1;
+ reuse.f2 = "test string";
+ return reuse;
+ }
+ }
+
+ @Override
+ public Tuple3<Integer, Integer, String> next() throws IOException {
+ Tuple2<Integer, Integer> next = input.next();
+ if (next == null) {
+ return null;
+ }
+ else {
+ return new Tuple3<Integer, Integer, String>(next.f0, next.f1, "test string");
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static class SingleValueIterator<T> implements MutableObjectIterator<T> {
+
+ private final T value;
+
+ private boolean pending = true;
+
+ private SingleValueIterator(T value) {
+ this.value = value;
+ }
+
+ @Override
+ public T next(T reuse) {
+ return next();
+ }
+
+ @Override
+ public T next() {
+ if (pending) {
+ pending = false;
+ return value;
+ } else {
+ return null;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java
new file mode 100644
index 0000000..b3d53c7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DelayingIterator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+
+public class DelayingIterator<T> implements MutableObjectIterator<T> {
+
+ private final MutableObjectIterator<T> iterator;
+ private final int delay;
+
+
+ public DelayingIterator(MutableObjectIterator<T> iterator, int delay) {
+ this.iterator = iterator;
+ this.delay = delay;
+ }
+
+ @Override
+ public T next(T reuse) throws IOException {
+ try {
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException e) {
+ // ignore, but restore interrupted state
+ Thread.currentThread().interrupt();
+ }
+ return iterator.next(reuse);
+ }
+
+ @Override
+ public T next() throws IOException {
+ try {
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException e) {
+ // ignore, but restore interrupted state
+ Thread.currentThread().interrupt();
+ }
+ return iterator.next();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java
new file mode 100644
index 0000000..ba2181b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/InfiniteIntTupleIterator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * A simple iterator that returns an infinite amount of (0, 0) tuples.
+ */
+public class InfiniteIntTupleIterator implements MutableObjectIterator<Tuple2<Integer, Integer>> {
+
+ @Override
+ public Tuple2<Integer, Integer> next(Tuple2<Integer, Integer> reuse) {
+ return next();
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> next() {
+ return new Tuple2<Integer, Integer>(0, 0);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
index 400e798..fd34a3b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TestData.java
@@ -226,7 +226,7 @@ public final class TestData {
length = valueLength - random.nextInt(valueLength / 3);
}
- StringBuffer sb = new StringBuffer();
+ StringBuilder sb = new StringBuilder();
for (int i = 0; i < length; i++) {
sb.append(alpha[random.nextInt(alpha.length)]);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
new file mode 100644
index 0000000..1e25bab
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.PactDriver;
+import org.apache.flink.runtime.operators.PactTaskContext;
+import org.apache.flink.runtime.operators.ResettablePactDriver;
+import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class UnaryOperatorTestBase<S extends Function, IN, OUT> implements PactTaskContext<S, OUT> {
+
+ protected static final long DEFAULT_PER_SORT_MEM = 16 * 1024 * 1024;
+
+ protected static final int PAGE_SIZE = 32 * 1024;
+
+ private final IOManager ioManager;
+
+ private final MemoryManager memManager;
+
+ private MutableObjectIterator<IN> input;
+
+ private TypeSerializer<IN> inputSerializer;
+
+ private List<TypeComparator<IN>> comparators;
+
+ private UnilateralSortMerger<IN> sorter;
+
+ private final AbstractInvokable owner;
+
+ private final TaskConfig taskConfig;
+
+ protected final long perSortMem;
+
+ protected final double perSortFractionMem;
+
+ private Collector<OUT> output;
+
+ protected int numFileHandles;
+
+ private S stub;
+
+ private PactDriver<S, OUT> driver;
+
+ private volatile boolean running;
+
+ private ExecutionConfig executionConfig;
+
+ protected UnaryOperatorTestBase(ExecutionConfig executionConfig, long memory, int maxNumSorters) {
+ this(executionConfig, memory, maxNumSorters, DEFAULT_PER_SORT_MEM);
+ }
+
+ protected UnaryOperatorTestBase(ExecutionConfig executionConfig, long memory, int maxNumSorters, long perSortMemory) {
+ if (memory < 0 || maxNumSorters < 0 || perSortMemory < 0) {
+ throw new IllegalArgumentException();
+ }
+
+ final long totalMem = Math.max(memory, 0) + (Math.max(maxNumSorters, 0) * perSortMemory);
+
+ this.perSortMem = perSortMemory;
+ this.perSortFractionMem = (double)perSortMemory/totalMem;
+ this.ioManager = new IOManagerAsync();
+ this.memManager = totalMem > 0 ? new DefaultMemoryManager(totalMem,1) : null;
+ this.owner = new DummyInvokable();
+
+ Configuration config = new Configuration();
+ this.taskConfig = new TaskConfig(config);
+
+ this.executionConfig = executionConfig;
+ this.comparators = new ArrayList<TypeComparator<IN>>(2);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> getConfigurations() {
+ ExecutionConfig withReuse = new ExecutionConfig();
+ withReuse.enableObjectReuse();
+
+ ExecutionConfig withoutReuse = new ExecutionConfig();
+ withoutReuse.disableObjectReuse();
+
+ Object[] a = { withoutReuse };
+ Object[] b = { withReuse };
+ return Arrays.asList(a, b);
+ }
+
+ public void setInput(MutableObjectIterator<IN> input, TypeSerializer<IN> serializer) {
+ this.input = input;
+ this.inputSerializer = serializer;
+ this.sorter = null;
+ }
+
+ public void addInputSorted(MutableObjectIterator<IN> input,
+ TypeSerializer<IN> serializer,
+ TypeComparator<IN> comp) throws Exception
+ {
+ this.input = null;
+ this.inputSerializer = serializer;
+ this.sorter = new UnilateralSortMerger<IN>(
+ this.memManager, this.ioManager, input, this.owner,
+ this.<IN>getInputSerializer(0),
+ comp,
+ this.perSortFractionMem, 32, 0.8f);
+ }
+
+ public void addDriverComparator(TypeComparator<IN> comparator) {
+ this.comparators.add(comparator);
+ }
+
+ public void setOutput(Collector<OUT> output) {
+ this.output = output;
+ }
+ public void setOutput(List<OUT> output, TypeSerializer<OUT> outSerializer) {
+ this.output = new ListOutputCollector<OUT>(output, outSerializer);
+ }
+
+ public int getNumFileHandlesForSort() {
+ return numFileHandles;
+ }
+
+
+ public void setNumFileHandlesForSort(int numFileHandles) {
+ this.numFileHandles = numFileHandles;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public void testDriver(PactDriver driver, Class stubClass) throws Exception {
+ testDriverInternal(driver, stubClass);
+ }
+
+ @SuppressWarnings({"unchecked","rawtypes"})
+ public void testDriverInternal(PactDriver driver, Class stubClass) throws Exception {
+
+ this.driver = driver;
+ driver.setup(this);
+
+ this.stub = (S)stubClass.newInstance();
+
+ // regular running logic
+ this.running = true;
+ boolean stubOpen = false;
+
+ try {
+ // run the data preparation
+ try {
+ driver.prepare();
+ }
+ catch (Throwable t) {
+ throw new Exception("The data preparation caused an error: " + t.getMessage(), t);
+ }
+
+ // open stub implementation
+ try {
+ FunctionUtils.openFunction(this.stub, getTaskConfig().getStubParameters());
+ stubOpen = true;
+ }
+ catch (Throwable t) {
+ throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
+ }
+
+ // run the user code
+ driver.run();
+
+ // close. We close here such that a regular close throwing an exception marks a task as failed.
+ if (this.running) {
+ FunctionUtils.closeFunction (this.stub);
+ stubOpen = false;
+ }
+
+ this.output.close();
+ }
+ catch (Exception ex) {
+ // close the input, but do not report any exceptions, since we already have another root cause
+ if (stubOpen) {
+ try {
+ FunctionUtils.closeFunction(this.stub);
+ }
+ catch (Throwable t) {
+ // ignore
+ }
+ }
+
+ // if resettable driver invoke tear-down
+ if (this.driver instanceof ResettablePactDriver) {
+ final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
+ try {
+ resDriver.teardown();
+ } catch (Throwable t) {
+ throw new Exception("Error while shutting down an iterative operator: " + t.getMessage(), t);
+ }
+ }
+
+ // drop exception, if the task was canceled
+ if (this.running) {
+ throw ex;
+ }
+
+ }
+ finally {
+ driver.cleanup();
+ }
+ }
+
+ @SuppressWarnings({"unchecked","rawtypes"})
+ public void testResettableDriver(ResettablePactDriver driver, Class stubClass, int iterations) throws Exception {
+ driver.setup(this);
+
+ for (int i = 0; i < iterations; i++) {
+ if (i == 0) {
+ driver.initialize();
+ }
+ else {
+ driver.reset();
+ }
+ testDriver(driver, stubClass);
+ }
+
+ driver.teardown();
+ }
+
+ public void cancel() throws Exception {
+ this.running = false;
+ this.driver.cancel();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public TaskConfig getTaskConfig() {
+ return this.taskConfig;
+ }
+
+ @Override
+ public ExecutionConfig getExecutionConfig() {
+ return executionConfig;
+ }
+
+ @Override
+ public ClassLoader getUserCodeClassLoader() {
+ return getClass().getClassLoader();
+ }
+
+ @Override
+ public IOManager getIOManager() {
+ return this.ioManager;
+ }
+
+ @Override
+ public MemoryManager getMemoryManager() {
+ return this.memManager;
+ }
+
+ @Override
+ public <X> MutableObjectIterator<X> getInput(int index) {
+ MutableObjectIterator<IN> in = this.input;
+ if (in == null) {
+ // waiting from sorter
+ try {
+ in = this.sorter.getIterator();
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted");
+ }
+ this.input = in;
+ }
+
+ @SuppressWarnings("unchecked")
+ MutableObjectIterator<X> input = (MutableObjectIterator<X>) this.input;
+ return input;
+ }
+
+ @Override
+ public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
+ if (index != 0) {
+ throw new IllegalArgumentException();
+ }
+
+ @SuppressWarnings("unchecked")
+ TypeSerializer<X> ser = (TypeSerializer<X>) inputSerializer;
+ return new RuntimeSerializerFactory<X>(ser, (Class<X>) ser.createInstance().getClass());
+ }
+
+ @Override
+ public <X> TypeComparator<X> getDriverComparator(int index) {
+ @SuppressWarnings("unchecked")
+ TypeComparator<X> comparator = (TypeComparator<X>) this.comparators.get(index);
+ return comparator;
+ }
+
+ @Override
+ public S getStub() {
+ return this.stub;
+ }
+
+ @Override
+ public Collector<OUT> getOutputCollector() {
+ return this.output;
+ }
+
+ @Override
+ public AbstractInvokable getOwningNepheleTask() {
+ return this.owner;
+ }
+
+ @Override
+ public String formatLogString(String message) {
+ return "Driver Tester: " + message;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @After
+ public void shutdownAll() throws Exception {
+ // 1st, shutdown sorters
+ if (this.sorter != null) {
+ sorter.close();
+ }
+
+ // 2nd, shutdown I/O
+ this.ioManager.shutdown();
+ Assert.assertTrue("I/O Manager has not properly shut down.", this.ioManager.isProperlyShutDown());
+
+ // last, verify all memory is returned and shutdown mem manager
+ MemoryManager memMan = getMemoryManager();
+ if (memMan != null) {
+ Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty());
+ memMan.shutdown();
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final class ListOutputCollector<OUT> implements Collector<OUT> {
+
+ private final List<OUT> output;
+ private final TypeSerializer<OUT> serializer;
+
+ public ListOutputCollector(List<OUT> outputList, TypeSerializer<OUT> serializer) {
+ this.output = outputList;
+ this.serializer = serializer;
+ }
+
+
+ @Override
+ public void collect(OUT record) {
+ this.output.add(serializer.copy(record));
+ }
+
+ @Override
+ public void close() {}
+ }
+
+ public static final class CountingOutputCollector<OUT> implements Collector<OUT> {
+
+ private int num;
+
+ @Override
+ public void collect(OUT record) {
+ this.num++;
+ }
+
+ @Override
+ public void close() {}
+
+ public int getNumberOfRecords() {
+ return this.num;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java
new file mode 100644
index 0000000..451cf9e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntStringTupleGenerator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class UniformIntStringTupleGenerator implements MutableObjectIterator<Tuple2<Integer, String>> {
+
+ private final int numKeys;
+ private final int numVals;
+
+ private int keyCnt;
+ private int valCnt;
+
+ private boolean repeatKey;
+
+
+ public UniformIntStringTupleGenerator(int numKeys, int numVals, boolean repeatKey) {
+ this.numKeys = numKeys;
+ this.numVals = numVals;
+ this.repeatKey = repeatKey;
+ }
+
+ @Override
+ public Tuple2<Integer, String> next(Tuple2<Integer, String> target) {
+ if (!repeatKey) {
+ if(valCnt >= numVals) {
+ return null;
+ }
+
+ target.f0 = keyCnt++;
+ target.f1 = Integer.toBinaryString(valCnt);
+
+ if(keyCnt == numKeys) {
+ keyCnt = 0;
+ valCnt++;
+ }
+ }
+ else {
+ if (keyCnt >= numKeys) {
+ return null;
+ }
+
+ target.f0 = keyCnt;
+ target.f1 = Integer.toBinaryString(valCnt++);
+
+ if (valCnt == numVals) {
+ valCnt = 0;
+ keyCnt++;
+ }
+ }
+
+ return target;
+ }
+
+ @Override
+ public Tuple2<Integer, String> next() {
+ return next(new Tuple2<Integer, String>());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java
new file mode 100644
index 0000000..457b4ad
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UniformIntTupleGenerator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class UniformIntTupleGenerator implements MutableObjectIterator<Tuple2<Integer, Integer>> {
+
+ private final int numKeys;
+ private final int numVals;
+
+ private int keyCnt = 0;
+ private int valCnt = 0;
+ private boolean repeatKey;
+
+ public UniformIntTupleGenerator(int numKeys, int numVals, boolean repeatKey) {
+ this.numKeys = numKeys;
+ this.numVals = numVals;
+ this.repeatKey = repeatKey;
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> next(Tuple2<Integer, Integer> target) {
+ if (!repeatKey) {
+ if(valCnt >= numVals) {
+ return null;
+ }
+
+ target.f0 = keyCnt++;
+ target.f1 = valCnt;
+
+ if (keyCnt == numKeys) {
+ keyCnt = 0;
+ valCnt++;
+ }
+ }
+ else {
+ if (keyCnt >= numKeys) {
+ return null;
+ }
+
+ target.f0 = keyCnt;
+ target.f1 = valCnt++;
+
+ if (valCnt == numVals) {
+ valCnt = 0;
+ keyCnt++;
+ }
+ }
+
+ return target;
+ }
+
+ @Override
+ public Tuple2<Integer, Integer> next() {
+ return next(new Tuple2<Integer, Integer>());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/01c74338/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
index 3a76ebd..1127fca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnionIterator.java
@@ -16,26 +16,30 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators.testutils;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.flink.util.MutableObjectIterator;
-
/**
* An iterator that returns the union of a given set of iterators.
*/
-public class UnionIterator<E> implements MutableObjectIterator<E>
-{
+public class UnionIterator<E> implements MutableObjectIterator<E> {
+
private MutableObjectIterator<E> currentSource;
private List<MutableObjectIterator<E>> nextSources;
+
+
+ public UnionIterator(MutableObjectIterator<E>... iterators) {
+ this(new ArrayList<MutableObjectIterator<E>>(Arrays.asList(iterators)));
+ }
- public UnionIterator(List<MutableObjectIterator<E>> sources)
- {
+ public UnionIterator(List<MutableObjectIterator<E>> sources) {
this.currentSource = sources.remove(0);
this.nextSources = sources;
}