You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2016/10/12 09:59:11 UTC

flink git commit: [FLINK-3706] Fix YARN test instability

Repository: flink
Updated Branches:
  refs/heads/master 1dda3ad00 -> 7b79c8b52


[FLINK-3706] Fix YARN test instability

The most important change in this commit is that the `YarnTestBase.Runner` doesn't do "try {} catch (Throwable t) { fail(t); }" anymore, which doesn't lead to a test failure, because its called outside the main thread.
With the change, all throwables are reported back to the main thread and fail the test there properly (many YARN tests benefit from this change).

This closes #2622


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b79c8b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b79c8b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b79c8b5

Branch: refs/heads/master
Commit: 7b79c8b52832e3a99e565e865831b56615416ae7
Parents: 1dda3ad
Author: Robert Metzger <rm...@apache.org>
Authored: Mon Oct 10 17:04:16 2016 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Oct 12 11:58:35 2016 +0200

----------------------------------------------------------------------
 .../YARNSessionCapacitySchedulerITCase.java     | 13 ++---
 .../flink/yarn/YARNSessionFIFOITCase.java       | 17 +------
 .../yarn/YARNSessionFIFOSecuredITCase.java      |  3 --
 .../org/apache/flink/yarn/YarnTestBase.java     | 50 +++++++++++---------
 4 files changed, 36 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b79c8b5/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 650397d..ec66eb2 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -309,12 +309,13 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	}
 
 	/**
-	 * Test deployment to non-existing queue. (user-reported error)
-	 * Deployment to the queue is possible because there are no queues, so we don't check.
+	 * Test deployment to non-existing queue & ensure that the system logs a WARN message
+	 * for the user. (Users had unexpected behavior of Flink on YARN because they mistyped the
+	 * target queue. With an error message, we can help users identifying the issue)
 	 */
 	@Test
-	public void testNonexistingQueue() {
-		LOG.info("Starting testNonexistingQueue()");
+	public void testNonexistingQueueWARNmessage() {
+		LOG.info("Starting testNonexistingQueueWARNmessage()");
 		addTestAppender(YarnClusterDescriptor.class, Level.WARN);
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
 				"-t", flinkLibFolder.getAbsolutePath(),
@@ -322,8 +323,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				"-jm", "768",
 				"-tm", "1024",
 				"-qu", "doesntExist"}, "to unknown queue: doesntExist", null, RunTypes.YARN_SESSION, 1);
-		checkForLogString("The specified queue 'doesntExist' does not exist. Available queues: default, qa-team");
-		LOG.info("Finished testNonexistingQueue()");
+		checkForLogString("The specified queue 'doesntExist' does not exist. Available queues");
+		LOG.info("Finished testNonexistingQueueWARNmessage()");
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/7b79c8b5/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index ca696f9..c5659eb 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -84,7 +84,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	public void testDetachedMode() {
 		LOG.info("Starting testDetachedMode()");
 		addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
-		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+		startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
 						"-t", flinkLibFolder.getAbsolutePath(),
 						"-n", "1",
 						"-jm", "768",
@@ -162,21 +162,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		LOG.info("Finished testQueryCluster()");
 	}
 
-	/**
-	 * Test deployment to non-existing queue. (user-reported error)
-	 * Deployment to the queue is possible because there are no queues, so we don't check.
-	 */
-	@Test
-	public void testNonexistingQueue() {
-		LOG.info("Starting testNonexistingQueue()");
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
-				"-t", flinkLibFolder.getAbsolutePath(),
-				"-n", "1",
-				"-jm", "768",
-				"-tm", "1024",
-				"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", null, RunTypes.YARN_SESSION, 0);
-		LOG.info("Finished testNonexistingQueue()");
-	}
 
 	/**
 	 * The test cluster has the following resources:

http://git-wip-us.apache.org/repos/asf/flink/blob/7b79c8b5/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
index 0b7c230..0725bf2 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -90,9 +90,6 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
 	public void testQueryCluster() {}
 
 	@Override
-	public void testNonexistingQueue() {}
-
-	@Override
 	public void testResourceComputation() {}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/7b79c8b5/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index afdd400..dba87de 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -490,7 +490,7 @@ public abstract class YarnTestBase extends TestLogger {
 
 		final int START_TIMEOUT_SECONDS = 60;
 
-		Runner runner = new Runner(args, type);
+		Runner runner = new Runner(args, type, 0);
 		runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs()).");
 		runner.start();
 
@@ -505,7 +505,10 @@ public abstract class YarnTestBase extends TestLogger {
 			// check if thread died
 			if(!runner.isAlive()) {
 				sendOutput();
-				Assert.fail("Runner thread died before the test was finished. Return value = "+runner.getReturnValue());
+				if(runner.getRunnerError() != null) {
+					throw new RuntimeException("Runner failed with exception.", runner.getRunnerError());
+				}
+				Assert.fail("Runner thread died before the test was finished.");
 			}
 		}
 
@@ -524,10 +527,10 @@ public abstract class YarnTestBase extends TestLogger {
 	 * @param terminateAfterString the runner is searching the stdout and stderr for this string. as soon as it appears, the test has passed
 	 * @param failOnPatterns The runner is searching stdout and stderr for the pattern (regexp) specified here. If one appears, the test has failed
 	 * @param type Set the type of the runner
-	 * @param returnCode Expected return code from the runner.
+	 * @param expectedReturnValue Expected return code from the runner.
 	 * @param checkLogForTerminateString  If true, the runner checks also the log4j logger for the terminate string
 	 */
-	protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnPatterns, RunTypes type, int returnCode, boolean checkLogForTerminateString) {
+	protected void runWithArgs(String[] args, String terminateAfterString, String[] failOnPatterns, RunTypes type, int expectedReturnValue, boolean checkLogForTerminateString) {
 		LOG.info("Running with args {}", Arrays.toString(args));
 
 		outContent = new ByteArrayOutputStream();
@@ -540,7 +543,7 @@ public abstract class YarnTestBase extends TestLogger {
 		final int START_TIMEOUT_SECONDS = 180;
 		final long deadline = System.currentTimeMillis() + (START_TIMEOUT_SECONDS * 1000);
 		
-		Runner runner = new Runner(args, type);
+		Runner runner = new Runner(args, type, expectedReturnValue);
 		runner.start();
 
 		boolean expectedStringSeen = false;
@@ -589,25 +592,22 @@ public abstract class YarnTestBase extends TestLogger {
 			else {
 				// check if thread died
 				if (!runner.isAlive()) {
-					if (runner.getReturnValue() != 0) {
-						Assert.fail("Runner thread died before the test was finished. Return value = "
-								+ runner.getReturnValue());
-					} else {
-						LOG.info("Runner stopped earlier than expected with return value = 0");
-					}
 					// leave loop: the runner died, so we can not expect new strings to show up.
 					break;
 				}
 			}
 		}
-		while (!expectedStringSeen && System.currentTimeMillis() < deadline);
+		while (runner.getRunnerError() == null && !expectedStringSeen && System.currentTimeMillis() < deadline);
 		
 		sendOutput();
+
+		if(runner.getRunnerError() != null) {
+			// this lets the test fail.
+			throw new RuntimeException("Runner failed", runner.getRunnerError());
+		}
 		Assert.assertTrue("During the timeout period of " + START_TIMEOUT_SECONDS + " seconds the " +
 				"expected string did not show up", expectedStringSeen);
 
-		// check for 0 return code
-		Assert.assertEquals("Expected return value", returnCode, runner.getReturnValue());
 		LOG.info("Test was successful");
 	}
 
@@ -621,22 +621,22 @@ public abstract class YarnTestBase extends TestLogger {
 
 	public static class Runner extends Thread {
 		private final String[] args;
-		private int returnValue;
+		private final int expectedReturnValue;
 		private RunTypes type;
 		private FlinkYarnSessionCli yCli;
+		private Throwable runnerError;
 
-		public Runner(String[] args, RunTypes type) {
+		public Runner(String[] args, RunTypes type, int expectedReturnValue) {
 			this.args = args;
 			this.type = type;
+			this.expectedReturnValue = expectedReturnValue;
 		}
 
-		public int getReturnValue() {
-			return returnValue;
-		}
 
 		@Override
 		public void run() {
 			try {
+				int returnValue;
 				switch (type) {
 					case YARN_SESSION:
 						yCli = new FlinkYarnSessionCli("", "", false);
@@ -670,11 +670,13 @@ public abstract class YarnTestBase extends TestLogger {
 						throw new RuntimeException("Unknown type " + type);
 				}
 
-				if (returnValue != 0) {
-					Assert.fail("The YARN session returned with non-null value=" + returnValue);
+				if (returnValue != this.expectedReturnValue) {
+					Assert.fail("The YARN session returned with unexpected value=" + returnValue + " expected=" + expectedReturnValue);
 				}
 			} catch (Throwable t) {
-				Assert.fail(t.getMessage());
+				LOG.info("Runner stopped with exception", t);
+				// save error.
+				this.runnerError = t;
 			}
 		}
 
@@ -684,6 +686,10 @@ public abstract class YarnTestBase extends TestLogger {
 				yCli.stop();
 			}
 		}
+
+		public Throwable getRunnerError() {
+			return runnerError;
+		}
 	}
 
 	// -------------------------- Tear down -------------------------- //