You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/03/04 11:52:42 UTC
flink git commit: [flink-yarn-tests] Add check for exceptions in the
flink logs.
Repository: flink
Updated Branches:
refs/heads/master 94a66d570 -> 7abed950d
[flink-yarn-tests] Add check for exceptions in the flink logs.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7abed950
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7abed950
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7abed950
Branch: refs/heads/master
Commit: 7abed950dba6afae30c906236fe8a88ff5b87964
Parents: 94a66d5
Author: Robert Metzger <rm...@apache.org>
Authored: Tue Mar 3 18:35:51 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Wed Mar 4 11:00:29 2015 +0100
----------------------------------------------------------------------
.../YARNSessionCapacitySchedulerITCase.java | 4 ++
.../flink/yarn/YARNSessionFIFOITCase.java | 9 ++++
.../org/apache/flink/yarn/YarnTestBase.java | 54 +++++++++++++++++++-
3 files changed, 66 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7abed950/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index e4f82cd..7da355b 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -55,6 +55,8 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
"-jm", "512",
"-tm", "1024", "-qu", "qa-team"},
"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
+
+ ensureNoExceptionsInLogFiles();
}
@@ -71,5 +73,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
"-tm", "1024",
"-qu", "doesntExist"}, "to unknown queue: doesntExist", RunTypes.YARN_SESSION);
checkForLogString("The specified queue 'doesntExist' does not exist. Available queues: default, qa-team");
+
+ ensureNoExceptionsInLogFiles();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7abed950/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index a365fbf..d5f301b 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -68,8 +68,10 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
"-tm", "1024"},
"Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
LOG.info("Finished testClientStartup()");
+ ensureNoExceptionsInLogFiles();
}
+
/**
* Test querying the YARN cluster.
*
@@ -80,6 +82,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
LOG.info("Starting testQueryCluster()");
runWithArgs(new String[] {"-q"}, "Summary: totalMemory 8192 totalCores 1332", RunTypes.YARN_SESSION); // we have 666*2 cores.
LOG.info("Finished testQueryCluster()");
+ ensureNoExceptionsInLogFiles();
}
/**
@@ -95,6 +98,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
"-tm", "1024",
"-qu", "doesntExist"}, "Number of connected TaskManagers changed to 1. Slots available: 1", RunTypes.YARN_SESSION);
LOG.info("Finished testNonexistingQueue()");
+ ensureNoExceptionsInLogFiles();
}
/**
@@ -113,6 +117,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
"-tm", "1024"}, "Number of connected TaskManagers changed to", RunTypes.YARN_SESSION); // the number of TMs depends on the speed of the test hardware
LOG.info("Finished testMoreNodesThanAvailable()");
checkForLogString("This YARN session requires 10752MB of memory in the cluster. There are currently only 8192MB available.");
+ ensureNoExceptionsInLogFiles();
}
/**
@@ -171,6 +176,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
LOG.info("Finished testfullAlloc()");
checkForLogString("There is not enough memory available in the YARN cluster. The TaskManager(s) require 3840MB each. NodeManagers available: [4096, 4096]\n" +
"After allocating the JobManager (512MB) and (1/2) TaskManagers, the following NodeManagers are available: [3584, 256]");
+ ensureNoExceptionsInLogFiles();
}
/**
@@ -188,6 +194,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
"-yjm", "512",
"-ytm", "1024", exampleJarLocation.getAbsolutePath()}, "Job execution switched to status FINISHED.", RunTypes.CLI_FRONTEND);
LOG.info("Finished perJobYarnCluster()");
+ ensureNoExceptionsInLogFiles();
}
/**
@@ -244,6 +251,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
// shutdown cluster
yarnCluster.shutdown();
LOG.info("Finished testJavaAPI()");
+
+ ensureNoExceptionsInLogFiles();
}
public boolean ignoreOnTravis() {
http://git-wip-us.apache.org/repos/asf/flink/blob/7abed950/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index a557f02..2dacf57 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -49,6 +50,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Scanner;
/**
@@ -64,6 +66,8 @@ public abstract class YarnTestBase {
private final static PrintStream originalStdout = System.out;
private final static PrintStream originalStderr = System.err;
+ private final static String TEST_CLUSTER_NAME = "flink-yarn-tests";
+
// Temp directory which is deleted after the unit test.
private static TemporaryFolder tmp = new TemporaryFolder();
@@ -216,6 +220,54 @@ public abstract class YarnTestBase {
return yarnSiteXML;
}
+ /**
+ * This method checks the written TaskManager and JobManager log files
+ * for exceptions.
+ */
+ public static void ensureNoExceptionsInLogFiles() {
+ File cwd = new File("target/"+TEST_CLUSTER_NAME);
+ Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to exist", cwd.exists());
+ Assert.assertTrue("Expecting directory "+cwd.getAbsolutePath()+" to be a directory", cwd.isDirectory());
+ System.out.println("cwd = "+cwd.getAbsolutePath());
+ File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ File f = new File(dir.getAbsolutePath()+ "/" + name);
+ // scan each file for 'Exception'.
+ Scanner scanner = null;
+ try {
+ scanner = new Scanner(f);
+ } catch (FileNotFoundException e) {
+ Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+f.getAbsolutePath());
+ }
+ while (scanner.hasNextLine()) {
+ final String lineFromFile = scanner.nextLine();
+ if(lineFromFile.contains("Exception")) {
+ return true;
+ }
+ }
+ return false;
+ }
+ });
+ if(foundFile != null) {
+ Scanner scanner = null;
+ try {
+ scanner = new Scanner(foundFile);
+ } catch (FileNotFoundException e) {
+ Assert.fail("Unable to locate file: "+e.getMessage()+" file: "+foundFile.getAbsolutePath());
+ }
+ LOG.warn("Found a file with an exception. Printing contents:");
+ while (scanner.hasNextLine()) {
+ LOG.warn("LINE: "+scanner.nextLine());
+ }
+ Assert.fail("Found a file "+foundFile+" with an exception");
+ }
+ }
+
+ public static void main(String[] args) {
+ ensureNoExceptionsInLogFiles();
+ }
+
public static void startYARNWithConfig(Configuration conf) {
flinkUberjar = findFile(".", new RootDirFilenameFilter());
Assert.assertNotNull(flinkUberjar);
@@ -228,7 +280,7 @@ public abstract class YarnTestBase {
try {
LOG.info("Starting up MiniYARN cluster");
if (yarnCluster == null) {
- yarnCluster = new MiniYARNCluster(YarnTestBase.class.getName(), 2, 1, 1);
+ yarnCluster = new MiniYARNCluster(TEST_CLUSTER_NAME, 2, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();