You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/10/23 16:06:08 UTC
[3/4] flink git commit: [FLINK-1982] [record-api] Remove dependencies
on Record API from flink-runtime tests
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
deleted file mode 100644
index 6c4659d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ /dev/null
@@ -1,1019 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.typeutils.record.RecordComparator;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
-import org.apache.flink.runtime.operators.testutils.DriverTestBase;
-import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
-import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
-import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-@SuppressWarnings("deprecation")
-public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
-
- private static final long HASH_MEM = 6*1024*1024;
-
- private static final long SORT_MEM = 3*1024*1024;
-
- private static final int NUM_SORTER = 2;
-
- private static final long BNLJN_MEM = 10 * PAGE_SIZE;
-
- private final double bnljn_frac;
-
- private final double hash_frac;
-
- @SuppressWarnings("unchecked")
- private final RecordComparator comparator1 = new RecordComparator(
- new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
-
- @SuppressWarnings("unchecked")
- private final RecordComparator comparator2 = new RecordComparator(
- new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
-
- private final List<Record> outList = new ArrayList<Record>();
-
-
- public MatchTaskTest(ExecutionConfig config) {
- super(config, HASH_MEM, NUM_SORTER, SORT_MEM);
- bnljn_frac = (double)BNLJN_MEM/this.getMemoryManager().getMemorySize();
- hash_frac = (double)HASH_MEM/this.getMemoryManager().getMemorySize();
- }
-
-
- @Test
- public void testSortBoth1MatchTask() {
- final int keyCnt1 = 20;
- final int valCnt1 = 1;
-
- final int keyCnt2 = 10;
- final int valCnt2 = 2;
-
- setOutput(this.outList);
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
- addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
-
- final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
- Assert.assertTrue("Resultset size was " + this.outList.size() + ". Expected was " + expCnt, this.outList.size() == expCnt);
-
- this.outList.clear();
- }
-
- @Test
- public void testSortBoth2MatchTask() {
-
- int keyCnt1 = 20;
- int valCnt1 = 1;
-
- int keyCnt2 = 20;
- int valCnt2 = 1;
-
- setOutput(this.outList);
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
- addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
-
- int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-
- Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-
- this.outList.clear();
-
- }
-
- @Test
- public void testSortBoth3MatchTask() {
-
- int keyCnt1 = 20;
- int valCnt1 = 1;
-
- int keyCnt2 = 20;
- int valCnt2 = 20;
-
- setOutput(this.outList);
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
- addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
-
- int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-
- Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-
- this.outList.clear();
-
- }
-
- @Test
- public void testSortBoth4MatchTask() {
-
- int keyCnt1 = 20;
- int valCnt1 = 20;
-
- int keyCnt2 = 20;
- int valCnt2 = 1;
-
- setOutput(this.outList);
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
- addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
-
- int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-
- Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-
- this.outList.clear();
-
- }
-
- @Test
- public void testSortBoth5MatchTask() {
-
- int keyCnt1 = 20;
- int valCnt1 = 20;
-
- int keyCnt2 = 20;
- int valCnt2 = 20;
-
- setOutput(this.outList);
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
- addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
-
- int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-
- Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-
- this.outList.clear();
-
- }
-
- @Test
- public void testSortFirstMatchTask() {
-
- int keyCnt1 = 20;
- int valCnt1 = 20;
-
- int keyCnt2 = 20;
- int valCnt2 = 20;
-
- setOutput(this.outList);
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
- addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
-
- int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-
- Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-
- this.outList.clear();
-
- }
-
- @Test
- public void testSortSecondMatchTask() {
-
- int keyCnt1 = 20;
- int valCnt1 = 20;
-
- int keyCnt2 = 20;
- int valCnt2 = 20;
-
- setOutput(this.outList);
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
- addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
-
- int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-
- Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-
- this.outList.clear();
-
- }
-
- @Test
- public void testMergeMatchTask() {
- int keyCnt1 = 20;
- int valCnt1 = 20;
-
- int keyCnt2 = 20;
- int valCnt2 = 20;
-
- setOutput(this.outList);
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
- addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
-
- try {
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
-
- int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
-
- Assert.assertTrue("Resultset size was "+this.outList.size()+". Expected was "+expCnt, this.outList.size() == expCnt);
-
- this.outList.clear();
-
- }
-
- @Test
- public void testFailingMatchTask() {
- int keyCnt1 = 20;
- int valCnt1 = 20;
-
- int keyCnt2 = 20;
- int valCnt2 = 20;
-
- setOutput(new NirvanaOutputList());
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
- addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
-
- try {
- testDriver(testTask, MockFailingMatchStub.class);
- Assert.fail("Driver did not forward Exception.");
- } catch (ExpectedTestException e) {
- // good!
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
- }
-
- @Test
- public void testCancelMatchTaskWhileSort1() {
- final int keyCnt = 20;
- final int valCnt = 20;
-
- try {
- setOutput(new NirvanaOutputList());
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
- addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort1()") {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockMatchStub.class);
- }
- catch (Throwable t) {
- error.set(t);
- }
- }
- };
- taskRunner.start();
-
- Thread.sleep(1000);
-
- cancel();
- taskRunner.interrupt();
-
- taskRunner.join(60000);
-
- assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
-
- Throwable taskError = error.get();
- if (taskError != null) {
- taskError.printStackTrace();
- fail("Error in task while canceling: " + taskError.getMessage());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testCancelMatchTaskWhileSort2() {
- final int keyCnt = 20;
- final int valCnt = 20;
-
- try {
- setOutput(new NirvanaOutputList());
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
- addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort2()") {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockMatchStub.class);
- }
- catch (Throwable t) {
- error.set(t);
- }
- }
- };
- taskRunner.start();
-
- Thread.sleep(1000);
-
- cancel();
- taskRunner.interrupt();
-
- taskRunner.join(60000);
-
- assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
-
- Throwable taskError = error.get();
- if (taskError != null) {
- taskError.printStackTrace();
- fail("Error in task while canceling: " + taskError.getMessage());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testCancelMatchTaskWhileMatching() {
- final int keyCnt = 20;
- final int valCnt = 20;
-
- try {
- setOutput(new NirvanaOutputList());
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
- addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-
- final AtomicReference<Throwable> error = new AtomicReference<>();
-
- Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileMatching()") {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockDelayingMatchStub.class);
- }
- catch (Throwable t) {
- error.set(t);
- }
- }
- };
- taskRunner.start();
-
- Thread.sleep(1000);
-
- cancel();
- taskRunner.interrupt();
-
- taskRunner.join(60000);
-
- assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
-
- Throwable taskError = error.get();
- if (taskError != null) {
- taskError.printStackTrace();
- fail("Error in task while canceling: " + taskError.getMessage());
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testHash1MatchTask() {
- int keyCnt1 = 20;
- int valCnt1 = 1;
-
- int keyCnt2 = 10;
- int valCnt2 = 2;
-
- addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
- addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- setOutput(this.outList);
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Test caused an exception.");
- }
-
- final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
- Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
- this.outList.clear();
- }
-
- @Test
- public void testHash2MatchTask() {
- int keyCnt1 = 20;
- int valCnt1 = 1;
-
- int keyCnt2 = 20;
- int valCnt2 = 1;
-
- addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
- addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- setOutput(this.outList);
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Test caused an exception.");
- }
-
- final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
- Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
- this.outList.clear();
- }
-
- @Test
- public void testHash3MatchTask() {
- int keyCnt1 = 20;
- int valCnt1 = 1;
-
- int keyCnt2 = 20;
- int valCnt2 = 20;
-
- addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
- addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- setOutput(this.outList);
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Test caused an exception.");
- }
-
- final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
- Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
- this.outList.clear();
- }
-
- @Test
- public void testHash4MatchTask() {
- int keyCnt1 = 20;
- int valCnt1 = 20;
-
- int keyCnt2 = 20;
- int valCnt2 = 1;
-
- addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
- addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- setOutput(this.outList);
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Test caused an exception.");
- }
-
- final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
- Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
- this.outList.clear();
- }
-
- @Test
- public void testHash5MatchTask() {
- int keyCnt1 = 20;
- int valCnt1 = 20;
-
- int keyCnt2 = 20;
- int valCnt2 = 20;
-
- addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
- addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- setOutput(this.outList);
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- testDriver(testTask, MockMatchStub.class);
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Test caused an exception.");
- }
-
- final int expCnt = valCnt1*valCnt2*Math.min(keyCnt1, keyCnt2);
- Assert.assertEquals("Wrong result set size.", expCnt, this.outList.size());
- this.outList.clear();
- }
-
- @Test
- public void testFailingHashFirstMatchTask() {
- int keyCnt1 = 20;
- int valCnt1 = 20;
-
- int keyCnt2 = 20;
- int valCnt2 = 20;
-
- addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
- addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- setOutput(new NirvanaOutputList());
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- testDriver(testTask, MockFailingMatchStub.class);
- Assert.fail("Function exception was not forwarded.");
- } catch (ExpectedTestException etex) {
- // good!
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Test caused an exception.");
- }
- }
-
- @Test
- public void testFailingHashSecondMatchTask() {
- int keyCnt1 = 20;
- int valCnt1 = 20;
-
- int keyCnt2 = 20;
- int valCnt2 = 20;
-
- addInput(new UniformRecordGenerator(keyCnt1, valCnt1, false));
- addInput(new UniformRecordGenerator(keyCnt2, valCnt2, false));
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- setOutput(new NirvanaOutputList());
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- try {
- testDriver(testTask, MockFailingMatchStub.class);
- Assert.fail("Function exception was not forwarded.");
- } catch (ExpectedTestException etex) {
- // good!
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("Test caused an exception.");
- }
- }
-
- @Test
- public void testCancelHashMatchTaskWhileBuildFirst() {
- final int keyCnt = 20;
- final int valCnt = 20;
-
- try {
- addInput(new DelayingInfinitiveInputIterator(100));
- addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
-
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
-
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-
- setOutput(new NirvanaOutputList());
-
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- final AtomicBoolean success = new AtomicBoolean(false);
-
- Thread taskRunner = new Thread() {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockMatchStub.class);
- success.set(true);
- } catch (Exception ie) {
- ie.printStackTrace();
- }
- }
- };
- taskRunner.start();
-
- Thread.sleep(1000);
- cancel();
-
- try {
- taskRunner.join();
- }
- catch (InterruptedException ie) {
- Assert.fail("Joining threads failed");
- }
-
- Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- @Test
- public void testHashCancelMatchTaskWhileBuildSecond() {
- final int keyCnt = 20;
- final int valCnt = 20;
-
- try {
- addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInput(new DelayingInfinitiveInputIterator(100));
-
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
-
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-
- setOutput(new NirvanaOutputList());
-
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- final AtomicBoolean success = new AtomicBoolean(false);
-
- Thread taskRunner = new Thread() {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockMatchStub.class);
- success.set(true);
- } catch (Exception ie) {
- ie.printStackTrace();
- }
- }
- };
- taskRunner.start();
-
- Thread.sleep(1000);
- cancel();
-
- try {
- taskRunner.join();
- }
- catch (InterruptedException ie) {
- Assert.fail("Joining threads failed");
- }
-
- Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
- }
- catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
- }
- }
-
- @Test
- public void testHashFirstCancelMatchTaskWhileMatching() {
- int keyCnt = 20;
- int valCnt = 20;
-
- addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- setOutput(new NirvanaOutputList());
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- final AtomicBoolean success = new AtomicBoolean(false);
-
- Thread taskRunner = new Thread() {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockMatchStub.class);
- success.set(true);
- } catch (Exception ie) {
- ie.printStackTrace();
- }
- }
- };
- taskRunner.start();
-
- TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
- tct.start();
-
- try {
- tct.join();
- taskRunner.join();
- } catch(InterruptedException ie) {
- Assert.fail("Joining threads failed");
- }
-
- Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
- }
-
- @Test
- public void testHashSecondCancelMatchTaskWhileMatching() {
- int keyCnt = 20;
- int valCnt = 20;
-
- addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addInput(new UniformRecordGenerator(keyCnt, valCnt, false));
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- setOutput(new NirvanaOutputList());
- getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
- getTaskConfig().setRelativeMemoryDriver(hash_frac);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- final AtomicBoolean success = new AtomicBoolean(false);
-
- Thread taskRunner = new Thread() {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockMatchStub.class);
- success.set(true);
- } catch (Exception ie) {
- ie.printStackTrace();
- }
- }
- };
- taskRunner.start();
-
- TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
- tct.start();
-
- try {
- tct.join();
- taskRunner.join();
- } catch(InterruptedException ie) {
- Assert.fail("Joining threads failed");
- }
-
- Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
- }
-
- // =================================================================================================
-
- public static final class MockMatchStub extends JoinFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
- out.collect(record1);
- }
- }
-
- public static final class MockFailingMatchStub extends JoinFunction {
- private static final long serialVersionUID = 1L;
-
- private int cnt = 0;
-
- @Override
- public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
- if (++this.cnt >= 10) {
- throw new ExpectedTestException();
- }
- out.collect(record1);
- }
- }
-
- public static final class MockDelayingMatchStub extends JoinFunction {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void join(Record record1, Record record2, Collector<Record> out) throws Exception {
- try {
- Thread.sleep(100);
- } catch (InterruptedException e) {
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
index f59c4a3..415b6bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskExternalITCase.java
@@ -26,9 +26,9 @@ import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
@@ -46,7 +46,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
private final RecordComparator comparator = new RecordComparator(
new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
- private final List<Record> outList = new ArrayList<Record>();
+ private final List<Record> outList = new ArrayList<>();
public ReduceTaskExternalITCase(ExecutionConfig config) {
@@ -68,7 +68,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
try {
addInputSorted(new UniformRecordGenerator(keyCnt, valCnt, false), this.comparator.duplicate());
- GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+ GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
testDriver(testTask, MockReduceStub.class);
} catch (Exception e) {
@@ -100,7 +100,7 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
try {
addInputSorted(new UniformRecordGenerator(keyCnt, valCnt, false), this.comparator.duplicate());
- GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+ GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
testDriver(testTask, MockReduceStub.class);
} catch (Exception e) {
@@ -130,14 +130,14 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
CombiningUnilateralSortMerger<Record> sorter = null;
try {
- sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(),
+ sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(),
getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false),
getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
this.perSortFractionMem,
2, 0.8f, true);
addInput(sorter.getIterator());
- GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+ GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
testDriver(testTask, MockCombiningReduceStub.class);
} catch (Exception e) {
@@ -176,14 +176,14 @@ public class ReduceTaskExternalITCase extends DriverTestBase<RichGroupReduceFunc
CombiningUnilateralSortMerger<Record> sorter = null;
try {
- sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(),
+ sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(),
getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false),
getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(),
this.perSortFractionMem,
2, 0.8f, false);
addInput(sorter.getIterator());
- GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+ GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
testDriver(testTask, MockCombiningReduceStub.class);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
index cc25c99..8bc7fe5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/ReduceTaskTest.java
@@ -27,9 +27,9 @@ import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction.Combinable;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable;
import org.apache.flink.runtime.operators.sort.CombiningUnilateralSortMerger;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.NirvanaOutputList;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Key;
import org.apache.flink.types.Record;
@@ -51,7 +52,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
private final RecordComparator comparator = new RecordComparator(
new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
- private final List<Record> outList = new ArrayList<Record>();
+ private final List<Record> outList = new ArrayList<>();
public ReduceTaskTest(ExecutionConfig config) {
super(config, 0, 1, 3*1024*1024);
@@ -69,7 +70,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
try {
addInputSorted(new UniformRecordGenerator(keyCnt, valCnt, false), this.comparator.duplicate());
- GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+ GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
testDriver(testTask, MockReduceStub.class);
} catch (Exception e) {
@@ -96,7 +97,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
- GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+ GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
try {
testDriver(testTask, MockReduceStub.class);
@@ -125,13 +126,13 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
CombiningUnilateralSortMerger<Record> sorter = null;
try {
- sorter = new CombiningUnilateralSortMerger<Record>(new MockCombiningReduceStub(),
+ sorter = new CombiningUnilateralSortMerger<>(new MockCombiningReduceStub(),
getMemoryManager(), getIOManager(), new UniformRecordGenerator(keyCnt, valCnt, false),
getOwningNepheleTask(), RecordSerializerFactory.get(), this.comparator.duplicate(), this.perSortFractionMem,
4, 0.8f, true);
addInput(sorter.getIterator());
- GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+ GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
testDriver(testTask, MockCombiningReduceStub.class);
} catch (Exception e) {
@@ -168,7 +169,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
setOutput(this.outList);
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
- GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+ GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
try {
testDriver(testTask, MockFailingReduceStub.class);
@@ -190,7 +191,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
- final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+ final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
try {
addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator.duplicate());
@@ -238,7 +239,7 @@ public class ReduceTaskTest extends DriverTestBase<RichGroupReduceFunction<Recor
setOutput(new NirvanaOutputList());
getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
- final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<Record, Record>();
+ final GroupReduceDriver<Record, Record> testTask = new GroupReduceDriver<>();
final AtomicBoolean success = new AtomicBoolean(false);
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
index 1f19699..542812c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainTaskTest.java
@@ -21,17 +21,17 @@ package org.apache.flink.runtime.operators.chaining;
import java.util.ArrayList;
import java.util.List;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.BatchTask;
-import org.apache.flink.runtime.operators.MapTaskTest.MockMapStub;
+import org.apache.flink.runtime.operators.FlatMapDriver;
+import org.apache.flink.runtime.operators.FlatMapTaskTest.MockMapStub;
import org.apache.flink.runtime.operators.ReduceTaskTest.MockReduceStub;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
@@ -49,14 +49,13 @@ import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({Task.class, ResultPartitionWriter.class})
-@SuppressWarnings("deprecation")
public class ChainTaskTest extends TaskTestBase {
private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
private static final int NETWORK_BUFFER_SIZE = 1024;
- private final List<Record> outList = new ArrayList<Record>();
+ private final List<Record> outList = new ArrayList<>();
@SuppressWarnings("unchecked")
private final RecordComparatorFactory compFact = new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}, new boolean[] {true});
@@ -95,16 +94,16 @@ public class ChainTaskTest extends TaskTestBase {
combineConfig.setRelativeMemoryDriver(memoryFraction);
// udf
- combineConfig.setStubWrapper(new UserCodeClassWrapper<MockReduceStub>(MockReduceStub.class));
+ combineConfig.setStubWrapper(new UserCodeClassWrapper<>(MockReduceStub.class));
getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, combineConfig, "combine");
}
// chained map+combine
{
- BatchTask<GenericCollectorMap<Record, Record>, Record> testTask =
- new BatchTask<GenericCollectorMap<Record, Record>, Record>();
- registerTask(testTask, CollectorMapDriver.class, MockMapStub.class);
+ BatchTask<FlatMapFunction<Record, Record>, Record> testTask =
+ new BatchTask<>();
+ registerTask(testTask, FlatMapDriver.class, MockMapStub.class);
try {
testTask.invoke();
@@ -156,17 +155,17 @@ public class ChainTaskTest extends TaskTestBase {
combineConfig.setRelativeMemoryDriver(memoryFraction);
// udf
- combineConfig.setStubWrapper(new UserCodeClassWrapper<MockFailingCombineStub>(MockFailingCombineStub.class));
+ combineConfig.setStubWrapper(new UserCodeClassWrapper<>(MockFailingCombineStub.class));
getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, combineConfig, "combine");
}
// chained map+combine
{
- final BatchTask<GenericCollectorMap<Record, Record>, Record> testTask =
- new BatchTask<GenericCollectorMap<Record, Record>, Record>();
+ final BatchTask<FlatMapFunction<Record, Record>, Record> testTask =
+ new BatchTask<>();
- super.registerTask(testTask, CollectorMapDriver.class, MockMapStub.class);
+ super.registerTask(testTask, FlatMapDriver.class, MockMapStub.class);
boolean stubFailed = false;
http://git-wip-us.apache.org/repos/asf/flink/blob/c0d7073a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index 777bfc8..63f54ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -19,11 +19,11 @@
package org.apache.flink.runtime.operators.testutils;
import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.io.DelimitedInputFormat;
+import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.io.DelimitedInputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path;
@@ -94,7 +94,7 @@ public abstract class TaskTestBase extends TestLogger {
final TaskConfig config = new TaskConfig(this.mockEnv.getTaskConfiguration());
config.setDriver(driver);
- config.setStubWrapper(new UserCodeClassWrapper<RichFunction>(stubClass));
+ config.setStubWrapper(new UserCodeClassWrapper<>(stubClass));
task.setEnvironment(this.mockEnv);
@@ -116,17 +116,17 @@ public abstract class TaskTestBase extends TestLogger {
}
}
- public void registerFileOutputTask(AbstractInvokable outTask, Class<? extends FileOutputFormat> stubClass, String outPath) {
+ public void registerFileOutputTask(AbstractInvokable outTask, Class<? extends FileOutputFormat<Record>> stubClass, String outPath) {
registerFileOutputTask(outTask, InstantiationUtil.instantiate(stubClass, FileOutputFormat.class), outPath);
}
- public void registerFileOutputTask(AbstractInvokable outTask, FileOutputFormat outputFormat, String outPath) {
+ public void registerFileOutputTask(AbstractInvokable outTask, FileOutputFormat<Record> outputFormat, String outPath) {
TaskConfig dsConfig = new TaskConfig(this.mockEnv.getTaskConfiguration());
outputFormat.setOutputFilePath(new Path(outPath));
outputFormat.setWriteMode(WriteMode.OVERWRITE);
- dsConfig.setStubWrapper(new UserCodeObjectWrapper<FileOutputFormat>(outputFormat));
+ dsConfig.setStubWrapper(new UserCodeObjectWrapper<>(outputFormat));
outTask.setEnvironment(this.mockEnv);
@@ -139,9 +139,9 @@ public abstract class TaskTestBase extends TestLogger {
}
public void registerFileInputTask(AbstractInvokable inTask,
- Class<? extends DelimitedInputFormat> stubClass, String inPath, String delimiter)
+ Class<? extends DelimitedInputFormat<Record>> stubClass, String inPath, String delimiter)
{
- DelimitedInputFormat format;
+ DelimitedInputFormat<Record> format;
try {
format = stubClass.newInstance();
}
@@ -153,7 +153,7 @@ public abstract class TaskTestBase extends TestLogger {
format.setDelimiter(delimiter);
TaskConfig dsConfig = new TaskConfig(this.mockEnv.getTaskConfiguration());
- dsConfig.setStubWrapper(new UserCodeObjectWrapper<DelimitedInputFormat>(format));
+ dsConfig.setStubWrapper(new UserCodeObjectWrapper<>(format));
this.inputSplitProvider.addInputSplits(inPath, 5);