You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/25 16:06:16 UTC

flink git commit: [FLINK-2528] [tests] Increase robustness and error reporting of MatchTaskTest

Repository: flink
Updated Branches:
  refs/heads/master 5156a1b3f -> 6f07c5f3a


[FLINK-2528] [tests] Increase robustness and error reporting of MatchTaskTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6f07c5f3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6f07c5f3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6f07c5f3

Branch: refs/heads/master
Commit: 6f07c5f3af1b4fd67cb61ad140b99095f7ee6c45
Parents: 5156a1b
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Aug 25 14:42:47 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Aug 25 14:42:47 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/operators/MatchTaskTest.java  | 280 ++++++++++---------
 .../operators/testutils/DriverTestBase.java     |  22 +-
 2 files changed, 170 insertions(+), 132 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6f07c5f3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
index 8fbf05e..15f3d0c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/MatchTaskTest.java
@@ -16,12 +16,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -38,9 +38,12 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Key;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.Collector;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import static org.junit.Assert.*;
+
 @SuppressWarnings("deprecation")
 public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
 	
@@ -58,11 +61,11 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator1 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
 	
 	@SuppressWarnings("unchecked")
 	private final RecordComparator comparator2 = new RecordComparator(
-		new int[]{0}, (Class<? extends Key<?>>[])new Class[]{ IntValue.class });
+		new int[]{0}, (Class<? extends Key<?>>[])new Class<?>[]{ IntValue.class });
 	
 	private final List<Record> outList = new ArrayList<Record>();
 	
@@ -393,150 +396,177 @@ public class MatchTaskTest extends DriverTestBase<FlatJoinFunction<Record, Recor
 	
 	@Test
 	public void testCancelMatchTaskWhileSort1() {
-		int keyCnt = 20;
-		int valCnt = 20;
-		
-		setOutput(new NirvanaOutputList());
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
+		final int keyCnt = 20;
+		final int valCnt = 20;
 		
 		try {
-			addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
-			addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		final AtomicBoolean success = new AtomicBoolean(false);
-		
-		Thread taskRunner = new Thread() {
-			@Override
-			public void run() {
-				try {
-					testDriver(testTask, MockMatchStub.class);
-					success.set(true);
-				} catch (Exception ie) {
-					ie.printStackTrace();
+			setOutput(new NirvanaOutputList());
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+			getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+			setNumFileHandlesForSort(4);
+			
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
+			
+			try {
+				addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+				addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+			} catch (Exception e) {
+				e.printStackTrace();
+				Assert.fail("The test caused an exception.");
+			}
+	
+			final AtomicReference<Throwable> error = new AtomicReference<>();
+
+			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort1()") {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockMatchStub.class);
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
 				}
+			};
+			taskRunner.start();
+
+			Thread.sleep(1000);
+
+			cancel();
+			taskRunner.interrupt();
+
+			taskRunner.join(60000);
+
+			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+			Throwable taskError = error.get();
+			if (taskError != null) {
+				taskError.printStackTrace();
+				fail("Error in task while canceling: " + taskError.getMessage());
 			}
-		};
-		taskRunner.start();
-		
-		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
-		tct.start();
-		
-		try {
-			tct.join();
-			taskRunner.join();
-		} catch(InterruptedException ie) {
-			Assert.fail("Joining threads failed");
 		}
-		
-		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 	
 	@Test
 	public void testCancelMatchTaskWhileSort2() {
-		int keyCnt = 20;
-		int valCnt = 20;
-		
-		setOutput(new NirvanaOutputList());
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
+		final int keyCnt = 20;
+		final int valCnt = 20;
 		
 		try {
-			addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-			addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("The test caused an exception.");
-		}
-		
-		final AtomicBoolean success = new AtomicBoolean(false);
-		
-		Thread taskRunner = new Thread() {
-			@Override
-			public void run() {
-				try {
-					testDriver(testTask, MockMatchStub.class);
-					success.set(true);
-				} catch (Exception ie) {
-					ie.printStackTrace();
+			setOutput(new NirvanaOutputList());
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+			getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+			setNumFileHandlesForSort(4);
+			
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
+			
+			try {
+				addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+				addInputSorted(new DelayingInfinitiveInputIterator(100), this.comparator1.duplicate());
+			} catch (Exception e) {
+				e.printStackTrace();
+				Assert.fail("The test caused an exception.");
+			}
+
+			final AtomicReference<Throwable> error = new AtomicReference<>();
+
+			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileSort2()") {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockMatchStub.class);
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
 				}
+			};
+			taskRunner.start();
+
+			Thread.sleep(1000);
+
+			cancel();
+			taskRunner.interrupt();
+
+			taskRunner.join(60000);
+
+			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+
+			Throwable taskError = error.get();
+			if (taskError != null) {
+				taskError.printStackTrace();
+				fail("Error in task while canceling: " + taskError.getMessage());
 			}
-		};
-		taskRunner.start();
-		
-		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
-		tct.start();
-		
-		try {
-			tct.join();
-			taskRunner.join();
-		} catch(InterruptedException ie) {
-			Assert.fail("Joining threads failed");
 		}
-		
-		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 	
 	@Test
 	public void testCancelMatchTaskWhileMatching() {
-		int keyCnt = 20;
-		int valCnt = 20;
-		
-		setOutput(new NirvanaOutputList());
-		addDriverComparator(this.comparator1);
-		addDriverComparator(this.comparator2);
-		getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
-		getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
-		getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
-		setNumFileHandlesForSort(4);
-		
-		final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
-		
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-		addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
-		
-		final AtomicBoolean success = new AtomicBoolean(false);
+		final int keyCnt = 20;
+		final int valCnt = 20;
 		
-		Thread taskRunner = new Thread() {
-			@Override
-			public void run() {
-				try {
-					testDriver(testTask, MockDelayingMatchStub.class);
-					success.set(true);
-				} catch (Exception ie) {
-					ie.printStackTrace();
+		try {
+			setOutput(new NirvanaOutputList());
+			addDriverComparator(this.comparator1);
+			addDriverComparator(this.comparator2);
+			getTaskConfig().setDriverPairComparator(RecordPairComparatorFactory.get());
+			getTaskConfig().setDriverStrategy(DriverStrategy.MERGE);
+			getTaskConfig().setRelativeMemoryDriver(bnljn_frac);
+			setNumFileHandlesForSort(4);
+			
+			final JoinDriver<Record, Record, Record> testTask = new JoinDriver<Record, Record, Record>();
+			
+			addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+			addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
+			
+			final AtomicReference<Throwable> error = new AtomicReference<>();
+			
+			Thread taskRunner = new Thread("Task runner for testCancelMatchTaskWhileMatching()") {
+				@Override
+				public void run() {
+					try {
+						testDriver(testTask, MockDelayingMatchStub.class);
+					}
+					catch (Throwable t) {
+						error.set(t);
+					}
 				}
+			};
+			taskRunner.start();
+			
+			Thread.sleep(1000);
+			
+			cancel();
+			taskRunner.interrupt();
+			
+			taskRunner.join(60000);
+			
+			assertFalse("Task thread did not finish within 60 seconds", taskRunner.isAlive());
+			
+			Throwable taskError = error.get();
+			if (taskError != null) {
+				taskError.printStackTrace();
+				fail("Error in task while canceling: " + taskError.getMessage());
 			}
-		};
-		taskRunner.start();
-		
-		TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
-		tct.start();
-		
-		try {
-			tct.join();
-			taskRunner.join();
-		} catch(InterruptedException ie) {
-			Assert.fail("Joining threads failed");
 		}
-		
-		Assert.assertTrue("Test threw an exception even though it was properly canceled.", success.get());
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
 	}
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/6f07c5f3/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index e4aad98..12ca909 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedList;
@@ -88,7 +86,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
 	
 	private PactDriver<S, Record> driver;
 	
-	private volatile boolean running;
+	private volatile boolean running = true;
 
 	private ExecutionConfig executionConfig;
 	
@@ -119,7 +117,7 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
 	}
 
 	@Parameterized.Parameters
-	public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
+	public static Collection<Object[]> getConfigurations() {
 
 		LinkedList<Object[]> configs = new LinkedList<Object[]>();
 
@@ -184,7 +182,6 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
 		this.stub = (S)stubClass.newInstance();
 
 		// regular running logic
-		this.running = true;
 		boolean stubOpen = false;
 
 		try {
@@ -205,6 +202,10 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
 				throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
 			}
 
+			if (!running) {
+				return;
+			}
+			
 			// run the user code
 			driver.run();
 
@@ -222,10 +223,10 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
 				try {
 					FunctionUtils.closeFunction(this.stub);
 				}
-				catch (Throwable t) {}
+				catch (Throwable ignored) {}
 			}
 
-			// if resettable driver invoke treardown
+			// if resettable driver invoke tear down
 			if (this.driver instanceof ResettablePactDriver) {
 				final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
 				try {
@@ -269,6 +270,13 @@ public class DriverTestBase<S extends Function> extends TestLogger implements Pa
 	
 	public void cancel() throws Exception {
 		this.running = false;
+		
+		// compensate for races, where cancel is called before the driver is set
+		// not that this is an artifact of a bad design of this test base, where the setup
+		// of the basic properties is not separated from the invocation of the execution logic 
+		while (this.driver == null) {
+			Thread.sleep(200);
+		}
 		this.driver.cancel();
 	}