You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/25 16:06:16 UTC
flink git commit: [FLINK-2528] [tests] Increase robustness and error
reporting of MatchTaskTest
Repository: flink
Updated Branches:
refs/heads/master 5156a1b3f -> 6f07c5f3a
[FLINK-2528] [tests] Increase robustness and error reporting of MatchTaskTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f07c5f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f07c5f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f07c5f3
Branch: refs/heads/master
Commit: 6f07c5f3af1b4fd67cb61ad140b99095f7ee6c45
Parents: 5156a1b
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 25 14:42:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 25 14:42:47 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/operators/MatchTaskTest.java | 280 ++++++++++---------
.../operators/testutils/DriverTestBase.java | 22 +-
2 files changed, 170 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6f07c5f3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index 8fbf05e..15f3d0c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -16,12 +16,12 @@
* limitations under the License.
*/
-
package org.apache.flink.runtime.operators;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -38,9 +38,12 @@ import org.apache.flink.types.IntValue;
import org.apache.flink.types.Key;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
+
import org.junit.Assert;
import org.junit.Test;
+import static org.junit.Assert.*;
+
@SuppressWarnings("deprecation")
public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
@@ -58,11 +61,11 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
@SuppressWarnings("unchecked")
private final RecordComparator comparator1 = new RecordComparator(
- new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+ new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
@SuppressWarnings("unchecked")
private final RecordComparator comparator2 = new RecordComparator(
- new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+ new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
private final List<Record> outList = new ArrayList<Record>();
@@ -393,150 +396,177 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
@Test
public void testCancelMatchTaskWhileSort1() {
- int keyCnt = 20;
- int valCnt = 20;
-
- setOutput(new NirvanaOutputList());
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
+ final int keyCnt = 20;
+ final int valCnt = 20;
try {
- addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
- addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
-
- final AtomicBoolean success = new AtomicBoolean(false);
-
- Thread taskRunner = new Thread() {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockMatchStub.class);
- success.set(true);
- } catch (Exception ie) {
- ie.printStackTrace();
+ setOutput(new NirvanaOutputList());
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
+
+ try {
+ addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort1()") {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
}
+ };
+ taskRunner.start();
+
+ Thread.sleep(1000);
+
+ cancel();
+ taskRunner.interrupt();
+
+ taskRunner.join(60000);
+
+ assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+ Throwable taskError = error.get();
+ if (taskError != null) {
+ taskError.printStackTrace();
+ fail("Error in task while canceling: " + taskError.getMessage());
}
- };
- taskRunner.start();
-
- TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
- tct.start();
-
- try {
- tct.join();
- taskRunner.join();
- } catch(InterruptedException ie) {
- Assert.fail("Joining threads failed");
}
-
- Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@Test
public void testCancelMatchTaskWhileSort2() {
- int keyCnt = 20;
- int valCnt = 20;
-
- setOutput(new NirvanaOutputList());
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
+ final int keyCnt = 20;
+ final int valCnt = 20;
try {
- addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
- addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail("The test caused an exception.");
- }
-
- final AtomicBoolean success = new AtomicBoolean(false);
-
- Thread taskRunner = new Thread() {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockMatchStub.class);
- success.set(true);
- } catch (Exception ie) {
- ie.printStackTrace();
+ setOutput(new NirvanaOutputList());
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
+
+ try {
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+ addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail("The test caused an exception.");
+ }
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort2()") {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockMatchStub.class);
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
}
+ };
+ taskRunner.start();
+
+ Thread.sleep(1000);
+
+ cancel();
+ taskRunner.interrupt();
+
+ taskRunner.join(60000);
+
+ assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+ Throwable taskError = error.get();
+ if (taskError != null) {
+ taskError.printStackTrace();
+ fail("Error in task while canceling: " + taskError.getMessage());
}
- };
- taskRunner.start();
-
- TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
- tct.start();
-
- try {
- tct.join();
- taskRunner.join();
- } catch(InterruptedException ie) {
- Assert.fail("Joining threads failed");
}
-
- Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@Test
public void testCancelMatchTaskWhileMatching() {
- int keyCnt = 20;
- int valCnt = 20;
-
- setOutput(new NirvanaOutputList());
- addDriverComparator(this.comparator1);
- addDriverComparator(this.comparator2);
- getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
- getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
- getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
- setNumFileHandlesForSort(4);
-
- final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-
- addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
- addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-
- final AtomicBoolean success = new AtomicBoolean(false);
+ final int keyCnt = 20;
+ final int valCnt = 20;
- Thread taskRunner = new Thread() {
- @Override
- public void run() {
- try {
- testDriver(testTask, MockDelayingMatchStub.class);
- success.set(true);
- } catch (Exception ie) {
- ie.printStackTrace();
+ try {
+ setOutput(new NirvanaOutputList());
+ addDriverComparator(this.comparator1);
+ addDriverComparator(this.comparator2);
+ getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+ getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+ getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+ setNumFileHandlesForSort(4);
+
+ final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
+
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+ addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+
+ Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileMatching()") {
+ @Override
+ public void run() {
+ try {
+ testDriver(testTask, MockDelayingMatchStub.class);
+ }
+ catch (Throwable t) {
+ error.set(t);
+ }
}
+ };
+ taskRunner.start();
+
+ Thread.sleep(1000);
+
+ cancel();
+ taskRunner.interrupt();
+
+ taskRunner.join(60000);
+
+ assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+ Throwable taskError = error.get();
+ if (taskError != null) {
+ taskError.printStackTrace();
+ fail("Error in task while canceling: " + taskError.getMessage());
}
- };
- taskRunner.start();
-
- TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
- tct.start();
-
- try {
- tct.join();
- taskRunner.join();
- } catch(InterruptedException ie) {
- Assert.fail("Joining threads failed");
}
-
- Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/6f07c5f3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index e4aad98..12ca909 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.operators.testutils;
-import java.io.FileNotFoundException;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
@@ -88,7 +86,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
private PactDriver<S, Record> driver;
- private volatile boolean running;
+ private volatile boolean running = true;
private ExecutionConfig executionConfig;
@@ -119,7 +117,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
}
@Parameterized.Parameters
- public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+ public static Collection<Object[]> getConfigurations() {
LinkedList<Object[]> configs = new LinkedList<Object[]>();
@@ -184,7 +182,6 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
this.stub = (S)stubClass.newInstance();
// regular running logic
- this.running = true;
boolean stubOpen = false;
try {
@@ -205,6 +202,10 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
}
+ if (!running) {
+ return;
+ }
+
// run the user code
driver.run();
@@ -222,10 +223,10 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
try {
FunctionUtils.closeFunction(this.stub);
}
- catch (Throwable t) {}
+ catch (Throwable ignored) {}
}
- // if resettable driver invoke treardown
+ // if resettable driver invoke tear down
if (this.driver instanceof ResettablePactDriver) {
final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
try {
@@ -269,6 +270,13 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
public void cancel() throws Exception {
this.running = false;
+
+ // compensate for races, where cancel is called before the driver is set
+ // not that this is an artifact of a bad design of this test base, where the setup
+ // of the basic properties is not separated from the invocation of the execution logic
+ while (this.driver == null) {
+ Thread.sleep(200);
+ }
this.driver.cancel();
}