You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/03/04 11:52:42 UTC

flink git commit: [flink-yarn-tests] Add check for exceptions in the flink logs.

Repository: flink
Updated Branches:
  refs/heads/master 94a66d570 -> 7abed950d


[flink-yarn-tests] Add check for exceptions in the flink logs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7abed950
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7abed950
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7abed950

Branch: refs/heads/master
Commit: 7abed950dba6afae30c906236fe8a88ff5b87964
Parents: 94a66d5
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Mar 3 18:35:51 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Mar 4 11:00:29 2015 +0100

----------------------------------------------------------------------
 .../YARNSessionCapacitySchedulerITCase.java     |  4 ++
 .../flink/yarn/YARNSessionFIFOITCase.java       |  9 ++++
 .../org/apache/flink/yarn/YarnTestBase.java     | 54 +++++++++++++++++++-
 3 files changed, 66 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7abed950/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index e4f82cd..7da355b 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -55,6 +55,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 						"-jm", "512",
 						"-tm", "1024", "-qu", "qa-team"},
 				"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
+
+		ensureNoExceptionsInLogFiles();
 	}
 
 
@@ -71,5 +73,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				"-tm", "1024",
 				"-qu", "doesntExist"}, "to unknown queue: doesntExist", RunTypes.YARN_SESSION);
 		checkForLogString("The specified queue 'doesntExist' does not exist. Available queues: default, qa-team");
+
+		ensureNoExceptionsInLogFiles();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7abed950/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index a365fbf..d5f301b 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -68,8 +68,10 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 						"-tm", "1024"},
 				"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
 		LOG.info("Finished testClientStartup()");
+		ensureNoExceptionsInLogFiles();
 	}
 
+
 	/**
 	 * Test querying the YARN cluster.
 	 *
@@ -80,6 +82,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		LOG.info("Starting testQueryCluster()");
 		runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", RunTypes.YARN_SESSION); // we have 666*2 cores.
 		LOG.info("Finished testQueryCluster()");
+		ensureNoExceptionsInLogFiles();
 	}
 
 	/**
@@ -95,6 +98,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 				"-tm", "1024",
 				"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
 		LOG.info("Finished testNonexistingQueue()");
+		ensureNoExceptionsInLogFiles();
 	}
 
 	/**
@@ -113,6 +117,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 				"-tm", "1024"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION); // the number of TMs depends on the speed of the test hardware
 		LOG.info("Finished testMoreNodesThanAvailable()");
 		checkForLogString("This YARN session requires 10752MB of memory in the cluster. There are currently only 8192MB available.");
+		ensureNoExceptionsInLogFiles();
 	}
 
 	/**
@@ -171,6 +176,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		LOG.info("Finished testfullAlloc()");
 		checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" +
 				"After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
+		ensureNoExceptionsInLogFiles();
 	}
 
 	/**
@@ -188,6 +194,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 				"-yjm", "512",
 				"-ytm", "1024", exampleJarLocation.getAbsolutePath()}, "Job execution switched to status FINISHED.", RunTypes.CLI_FRONTEND);
 		LOG.info("Finished perJobYarnCluster()");
+		ensureNoExceptionsInLogFiles();
 	}
 
 	/**
@@ -244,6 +251,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		// shutdown cluster
 		yarnCluster.shutdown();
 		LOG.info("Finished testJavaAPI()");
+
+		ensureNoExceptionsInLogFiles();
 	}
 
 	public boolean ignoreOnTravis() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7abed950/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index a557f02..2dacf57 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileWriter;
 import java.io.FilenameFilter;
 import java.io.IOException;
@@ -49,6 +50,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Scanner;
 
 
 /**
@@ -64,6 +66,8 @@ public abstract class YarnTestBase {
 	private final static PrintStream originalStdout = System.out;
 	private final static PrintStream originalStderr = System.err;
 
+	private final static String TEST_CLUSTER_NAME = "flink-yarn-tests";
+
 
 	// Temp directory which is deleted after the unit test.
 	private static TemporaryFolder tmp = new TemporaryFolder();
@@ -216,6 +220,54 @@ public abstract class YarnTestBase {
 		return yarnSiteXML;
 	}
 
+	/**
+	 * This method checks the written TaskManager and JobManager log files
+	 * for exceptions.
+	 */
+	public static void ensureNoExceptionsInLogFiles() {
+		File cwd = new File("target/"+TEST_CLUSTER_NAME);
+		Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to exist", cwd.exists());
+		Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory());
+		System.out.println("cwd = "+cwd.getAbsolutePath());
+		File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
+			@Override
+			public boolean accept(File dir, String name) {
+				File f = new File(dir.getAbsolutePath()+ "/" + name);
+				// scan each file for 'Exception'.
+				Scanner scanner =  null;
+				try {
+					scanner = new Scanner(f);
+				} catch (FileNotFoundException e) {
+					Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+f.getAbsolutePath());
+				}
+				while (scanner.hasNextLine()) {
+					final String lineFromFile = scanner.nextLine();
+					if(lineFromFile.contains("Exception")) {
+						return true;
+					}
+				}
+				return false;
+			}
+		});
+		if(foundFile != null) {
+			Scanner scanner =  null;
+			try {
+				scanner = new Scanner(foundFile);
+			} catch (FileNotFoundException e) {
+				Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+foundFile.getAbsolutePath());
+			}
+			LOG.warn("Found a file with an exception. Printing contents:");
+			while (scanner.hasNextLine()) {
+				LOG.warn("LINE: "+scanner.nextLine());
+			}
+			Assert.fail("Found a file "+foundFile+" with an exception");
+		}
+	}
+
+	public static void main(String[] args) {
+		ensureNoExceptionsInLogFiles();
+	}
+
 	public static void startYARNWithConfig(Configuration conf) {
 		flinkUberjar = findFile(".", new RootDirFilenameFilter());
 		Assert.assertNotNull(flinkUberjar);
@@ -228,7 +280,7 @@ public abstract class YarnTestBase {
 		try {
 			LOG.info("Starting up MiniYARN cluster");
 			if (yarnCluster == null) {
-				yarnCluster = new MiniYARNCluster(YarnTestBase.class.getName(), 2, 1, 1);
+				yarnCluster = new MiniYARNCluster(TEST_CLUSTER_NAME, 2, 1, 1);
 
 				yarnCluster.init(conf);
 				yarnCluster.start();