You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/10/20 18:38:09 UTC

[1/2] flink git commit: [FLINK-2865] remove upper direct memory size bound

Repository: flink
Updated Branches:
  refs/heads/master a6890b284 -> e71196972


[FLINK-2865] remove upper direct memory size bound

- set the upper bound to Long.MAX_VALUE

For YARN, we set it to the calculated maximum container size (no need to fix).


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c44d93d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c44d93d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c44d93d

Branch: refs/heads/master
Commit: 6c44d93d0a9da725ef8b1ad2a94889f79321db73
Parents: a6890b2
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Oct 19 16:17:07 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Oct 20 10:10:42 2015 +0200

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/taskmanager.sh | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c44d93d/flink-dist/src/main/flink-bin/bin/taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/taskmanager.sh b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
index fd29a93..f69dd1c 100755
--- a/flink-dist/src/main/flink-bin/bin/taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/bin/taskmanager.sh
@@ -59,8 +59,9 @@ if [[ $STARTSTOP == "start" ]]; then
     if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
 
         TM_HEAP_SIZE=${FLINK_TM_HEAP}
-        # This is an upper bound, much less direct memory will be used
-        TM_MAX_OFFHEAP_SIZE=${FLINK_TM_HEAP}
+        # Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
+        #
+        TM_MAX_OFFHEAP_SIZE="8388607T"
 
         if [[ "${STREAMINGMODE}" == "batch" ]] && useOffHeapMemory; then
             if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
@@ -88,7 +89,7 @@ if [[ $STARTSTOP == "start" ]]; then
             fi
         fi
 
-        export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}M"
+        export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"
 
     fi
 


[2/2] flink git commit: [FLINK-2873] detect & serve the job manager log files correctly

Posted by mx...@apache.org.
[FLINK-2873] detect & serve the job manager log files correctly


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7119697
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7119697
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7119697

Branch: refs/heads/master
Commit: e71196972c6acd124bd5ff36ad57dc493cf35e93
Parents: 6c44d93
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Oct 20 17:07:40 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Oct 20 17:07:40 2015 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  5 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 53 +++-------------
 .../files/StaticFileServerHandler.java          | 15 +----
 .../webmonitor/WebRuntimeMonitorITCase.java     | 13 ++--
 .../runtime/webmonitor/WebMonitorUtils.java     | 63 ++++++++++++++++++++
 .../apache/flink/test/util/TestBaseUtils.java   |  5 +-
 .../flink/test/web/WebFrontendITCase.java       | 10 ++--
 .../flink/yarn/ApplicationMasterBase.scala      |  9 +--
 .../org/apache/flink/yarn/YarnJobManager.scala  |  4 +-
 9 files changed, 96 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index be730a0..fc2087a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -297,7 +297,10 @@ public final class ConfigConstants {
 	 */
 	public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history";
 
-	public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.logpath";
+	/**
+	 * The log file location (may be in /log for standalone but under log directory when using YARN)
+	 */
+	public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.log.path";
 
 
 	// ------------------------------ Web Client ------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 40d9f2d..e69165d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
-import com.google.common.io.PatternFilenameFilter;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
@@ -32,9 +31,7 @@ import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Router;
 import io.netty.handler.stream.ChunkedWriteHandler;
 import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
@@ -65,10 +62,8 @@ import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -91,12 +86,6 @@ public class WebRuntimeMonitor implements WebMonitor {
 	/** Logger for web frontend startup / shutdown messages */
 	private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
 
-	/** Job manager's log file pattern */
-	public static final FilenameFilter LOG_FILE_PATTERN = new PatternFilenameFilter(".*jobmanager[^\\.]*\\.log");
-
-	/** Job manager's stdout file pattern */
-	public static final FilenameFilter STDOUT_FILE_PATTERN = new PatternFilenameFilter(".*jobmanager[^\\.]*\\.out");
-
 	// ------------------------------------------------------------------------
 
 	/** Guarding concurrent modifications to the server channel pipeline during startup and shutdown */
@@ -136,39 +125,11 @@ public class WebRuntimeMonitor implements WebMonitor {
 		String fileName = String.format("flink-web-%s", UUID.randomUUID().toString());
 		webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName);
 		LOG.info("Using directory {} for the web interface files", webRootDir);
-		
-		// figure out where our logs are
-		final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
-		final String defaultLogDirectory = flinkRoot + "/log";
-		final String logDirectories = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, defaultLogDirectory);
-
-		// find out which directory holds the path for log and stdout
-		final ArrayList<String> logPaths = new ArrayList<>();
-		final ArrayList<String> outPaths = new ArrayList<>();
-
-		// yarn allows for multiple log directories. Search in all.
-		for(String paths: logDirectories.split(",")) {
-			File dir = new File(paths);
-			if (dir.exists() && dir.isDirectory() && dir.canRead()) {
-				if (dir.listFiles(LOG_FILE_PATTERN).length == 1) {
-					logPaths.add(paths);
-				}
-				if (dir.listFiles(STDOUT_FILE_PATTERN).length == 1) {
-					outPaths.add(paths);
-				}
-			}
-		}
 
-		// we don't want any ambiguities. There must be only one log and out file.
-		if(logPaths.size() != 1 || outPaths.size() != 1) {
-			throw new IllegalConfigurationException("The path to the log and out files (" +
-					logDirectories  + ") is not valid.");
-		}
+		final WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(config);
 
-		final File logDir = new File(logPaths.get(0));
-		final File outDir = new File(outPaths.get(0));
-		LOG.info("Serving job manager logs from {}", logDir.getAbsolutePath());
-		LOG.info("Serving job manager stdout from {}", outDir.getAbsolutePath());
+		LOG.info("Serving job manager log from {}", logFiles.logFile.getAbsolutePath());
+		LOG.info("Serving job manager stdout from {}", logFiles.stdOutFile.getAbsolutePath());
 
 		// port configuration
 		this.configuredPort = cfg.getWebFrontendPort();
@@ -190,7 +151,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 			// the overview - how many task managers, slots, free slots, ...
 			.GET("/overview", handler(new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT)))
 
-			// job manager configuration, log and stdout
+			// job manager configuration
 			.GET("/jobmanager/config", handler(new JobManagerConfigHandler(config)))
 
 			// overview over jobs
@@ -220,8 +181,10 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
 			.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
 
-			.GET("/jobmanager/log", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logDir))
-			.GET("/jobmanager/stdout", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, outDir))
+			// log and stdout
+			.GET("/jobmanager/log", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile))
+			.GET("/jobmanager/stdout", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile))
+
 			// this handler serves all the static contents
 			.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index 02dd81e..df330fd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -45,7 +45,6 @@ import io.netty.handler.codec.http.router.KeepAliveWrite;
 import io.netty.handler.codec.http.router.Routed;
 import io.netty.util.CharsetUtil;
 import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
 import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
 import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
 import org.slf4j.Logger;
@@ -60,7 +59,6 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.FilenameFilter;
 import java.io.RandomAccessFile;
 import java.nio.file.Files;
 import java.text.ParseException;
@@ -166,11 +164,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 				requestPath = requestPath + "index.html";
 			}
 
-		// in case the files being accessed are logs or stdout files, find appropriate paths.
-		if (requestPath.equals("/jobmanager/log")) {
-			requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.LOG_FILE_PATTERN);
-		} else if (requestPath.equals("/jobmanager/stdout")) {
-			requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.STDOUT_FILE_PATTERN);
+			// in case the files being accessed are logs or stdout files, find appropriate paths.
+			if (requestPath.equals("/jobmanager/log") || requestPath.equals("/jobmanager/stdout")) {
+				requestPath = "";
 			}
 
 			Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
@@ -371,9 +367,4 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
 		String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType();
 		response.headers().set(CONTENT_TYPE, mimeFinal);
 	}
-
-	private static String getFileName(File directory, FilenameFilter pattern) {
-		File[] files = directory.listFiles(pattern);
-		return files.length == 0 ? null : files[0].getName();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 68b00dc..a3f152d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -47,6 +47,7 @@ import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Scanner;
@@ -84,12 +85,12 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 			ActorRef jmActor = flink.jobManagerActors().get().head();
 
 			File logDir = temporaryFolder.newFolder("log");
-			Files.createFile(new File(logDir, "jobmanager.log").toPath());
+			Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
 			Files.createFile(new File(logDir, "jobmanager.out").toPath());
 
 			Configuration monitorConfig = new Configuration();
 			monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-			monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath());
+			monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
 
 			// Needs to match the leader address from the leader retrieval service
 			String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
@@ -149,11 +150,11 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				temporaryFolder.getRoot().getPath());
 
 			File logDir = temporaryFolder.newFolder();
-			Files.createFile(new File(logDir, "jobmanager.log").toPath());
+			Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
 			Files.createFile(new File(logDir, "jobmanager.out").toPath());
 
 			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath());
+			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
 
 			for (int i = 0; i < jobManagerSystem.length; i++) {
 				jobManagerSystem[i] = AkkaUtils.createActorSystem(new Configuration(),
@@ -289,12 +290,12 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 		try (TestingServer zooKeeper = new TestingServer()) {
 
 			File logDir = temporaryFolder.newFolder();
-			Files.createFile(new File(logDir, "jobmanager.log").toPath());
+			Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
 			Files.createFile(new File(logDir, "jobmanager.out").toPath());
 
 			final Configuration config = new Configuration();
 			config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
-			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath());
+			config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
 			config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
 			config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 4fca270..181d6d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -33,6 +35,7 @@ import org.codehaus.jettison.json.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.util.HashMap;
@@ -48,6 +51,66 @@ public final class WebMonitorUtils {
 	private static final Logger LOG = LoggerFactory.getLogger(WebMonitorUtils.class);
 
 	/**
+	 * Singleton to hold the log and stdout file
+	 */
+	public static class LogFiles {
+
+		private static LogFiles INSTANCE;
+
+		public final File logFile;
+		public final File stdOutFile;
+
+		private LogFiles(String logFile) {
+			this.logFile = checkFileLocation(logFile);
+			String stdOutFile = logFile.replaceFirst("\\.log$", ".out");
+			this.stdOutFile = checkFileLocation(stdOutFile);;
+		}
+
+		/**
+		 * Verify log file location
+		 * @param logFilePath Path to log file
+		 * @return File or null if not a valid log file
+		 */
+		private static File checkFileLocation (String logFilePath) {
+			File logFile = new File(logFilePath);
+			if (logFile.exists() && logFile.canRead()) {
+				return logFile;
+			} else {
+				throw new IllegalConfigurationException("Job manager log file was supposed to be at " +
+						logFile.getAbsolutePath() + " but it does not exist or is not readable.");
+			}
+		}
+
+		/**
+		 * Finds the Flink log directory using log.file Java property that is set during startup.
+		 */
+		public static LogFiles find(Configuration config) {
+			if (INSTANCE == null) {
+
+				/** Figure out log file location based on 'log.file' VM argument **/
+				final String logEnv = "log.file";
+				String logFilePath = System.getProperty(logEnv);
+
+				if (logFilePath == null) {
+					LOG.warn("Log file environment variable '{}' is not set.", logEnv);
+					logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
+				}
+
+				if (logFilePath == null) {
+					throw new IllegalConfigurationException("JobManager log file not found. " +
+							"Can't serve log files. Log file location couldn't be determined via the " +
+							logEnv + " environment variable or the config constant " +
+							ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY);
+				}
+
+				INSTANCE = new LogFiles(logFilePath);
+			}
+
+			return INSTANCE;
+		}
+	}
+
+	/**
 	 * Starts the web runtime monitor. Because the actual implementation of the runtime monitor is
 	 * in another project, we load the runtime monitor dynamically.
 	 * <p/>

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index ce3adce..b2bfd6b 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -59,6 +59,7 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -131,7 +132,7 @@ public class TestBaseUtils extends TestLogger {
 		logDir = File.createTempFile("TestBaseUtils-logdir", null);
 		Assert.assertTrue("Unable to delete temp file", logDir.delete());
 		Assert.assertTrue("Unable to create temp directory", logDir.mkdir());
-		Files.createFile(new File(logDir, "jobmanager.log").toPath());
+		Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
 		Files.createFile(new File(logDir, "jobmanager.out").toPath());
 
 		config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
@@ -141,7 +142,7 @@ public class TestBaseUtils extends TestLogger {
 		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
 
 		config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
-		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.toString());
+		config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
 
 		ForkableFlinkMiniCluster cluster =  new ForkableFlinkMiniCluster(config, singleActorSystem, mode);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index bea96d3..9c37a95 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -115,14 +115,13 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
 	@Test
 	public void getLogAndStdoutFiles() {
 		try {
-			String logPath = cluster.configuration().getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
-			Assert.assertNotNull(logPath);
+			WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(cluster.configuration());
 
-			FileUtils.writeStringToFile(new File(logPath, "jobmanager.log"), "job manager log");
+			FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
 			String logs = getFromHTTP("http://localhost:" + port + "/jobmanager/log");
 			Assert.assertTrue(logs.contains("job manager log"));
 
-			FileUtils.writeStringToFile(new File(logPath, "jobmanager.out"), "job manager out");
+			FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
 			logs = getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
 			Assert.assertTrue(logs.contains("job manager out"));
 		}catch(Throwable e) {
@@ -138,8 +137,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
 			JSONArray array = new JSONArray(config);
 
 			Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(array);
-			Assert.assertEquals(logDir.toString(),
-					conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY));
+			Assert.assertTrue(conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY).startsWith(logDir.toString()));
 			Assert.assertEquals(
 					cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
 					conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
index f3892dd..73fc951 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
@@ -93,8 +93,6 @@ abstract class ApplicationMasterBase {
       val currDir = env.get(Environment.PWD.key())
       require(currDir != null, "Current directory unknown.")
 
-      val logDirs = env.get(Environment.LOG_DIRS.key())
-
       val streamingMode = if(ApplicationMasterBase.hasStreamingMode(env)) {
         log.info("Starting ApplicationMaster/JobManager in streaming mode")
         StreamingMode.STREAMING
@@ -119,8 +117,7 @@ abstract class ApplicationMasterBase {
 
       // if a web monitor shall be started, set the port to random binding
       if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
-        config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
-        config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set port to 0.
+        config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
       }
 
       val (actorSystem, jmActor, archiveActor, webMonitor) =
@@ -147,7 +144,7 @@ abstract class ApplicationMasterBase {
 
       // generate configuration file for TaskManagers
       generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, akkaHostname,
-        jobManagerPort, webServerPort, logDirs, slots, taskManagerCount,
+        jobManagerPort, webServerPort, slots, taskManagerCount,
         dynamicPropertiesEncodedString)
 
       val hadoopConfig = new YarnConfiguration();
@@ -184,7 +181,6 @@ abstract class ApplicationMasterBase {
     ownHostname: String,
     jobManagerPort: Int,
     jobManagerWebPort: Int,
-    logDirs: String,
     slots: Int,
     taskManagerCount: Int,
     dynamicPropertiesEncodedString: String)
@@ -202,7 +198,6 @@ abstract class ApplicationMasterBase {
     output.println(s"${ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY}: $ownHostname")
     output.println(s"${ConfigConstants.JOB_MANAGER_IPC_PORT_KEY}: $jobManagerPort")
 
-    output.println(s"${ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY}: $logDirs")
     output.println(s"${ConfigConstants.JOB_MANAGER_WEB_PORT_KEY}: $jobManagerWebPort")
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 4ada21e..c8a9480 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -691,8 +691,8 @@ class YarnJobManager(
     }
 
     tmCommand ++= s" ${taskManagerRunnerClass.getName} --configDir . 1> " +
-      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stdout.log 2> " +
-      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stderr.log"
+      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.out 2> " +
+      s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.err"
 
     tmCommand ++= " --streamingMode"
     if(streamingMode) {