You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/10/20 18:38:10 UTC
[2/2] flink git commit: [FLINK-2873] detect & serve the job manager
log files correctly
[FLINK-2873] detect & serve the job manager log files correctly
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7119697
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7119697
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7119697
Branch: refs/heads/master
Commit: e71196972c6acd124bd5ff36ad57dc493cf35e93
Parents: 6c44d93
Author: Maximilian Michels <mx...@apache.org>
Authored: Tue Oct 20 17:07:40 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Tue Oct 20 17:07:40 2015 +0200
----------------------------------------------------------------------
.../flink/configuration/ConfigConstants.java | 5 +-
.../runtime/webmonitor/WebRuntimeMonitor.java | 53 +++-------------
.../files/StaticFileServerHandler.java | 15 +----
.../webmonitor/WebRuntimeMonitorITCase.java | 13 ++--
.../runtime/webmonitor/WebMonitorUtils.java | 63 ++++++++++++++++++++
.../apache/flink/test/util/TestBaseUtils.java | 5 +-
.../flink/test/web/WebFrontendITCase.java | 10 ++--
.../flink/yarn/ApplicationMasterBase.scala | 9 +--
.../org/apache/flink/yarn/YarnJobManager.scala | 4 +-
9 files changed, 96 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index be730a0..fc2087a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -297,7 +297,10 @@ public final class ConfigConstants {
*/
public static final String JOB_MANAGER_WEB_ARCHIVE_COUNT = "jobmanager.web.history";
- public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.logpath";
+ /**
+ * The log file location (may be in /log for standalone but under log directory when using YARN)
+ */
+ public static final String JOB_MANAGER_WEB_LOG_PATH_KEY = "jobmanager.web.log.path";
// ------------------------------ Web Client ------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 40d9f2d..e69165d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.webmonitor;
import akka.actor.ActorSystem;
-import com.google.common.io.PatternFilenameFilter;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
@@ -32,9 +31,7 @@ import io.netty.handler.codec.http.router.Handler;
import io.netty.handler.codec.http.router.Router;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
@@ -65,10 +62,8 @@ import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import java.io.File;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -91,12 +86,6 @@ public class WebRuntimeMonitor implements WebMonitor {
/** Logger for web frontend startup / shutdown messages */
private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
- /** Job manager's log file pattern */
- public static final FilenameFilter LOG_FILE_PATTERN = new PatternFilenameFilter(".*jobmanager[^\\.]*\\.log");
-
- /** Job manager's stdout file pattern */
- public static final FilenameFilter STDOUT_FILE_PATTERN = new PatternFilenameFilter(".*jobmanager[^\\.]*\\.out");
-
// ------------------------------------------------------------------------
/** Guarding concurrent modifications to the server channel pipeline during startup and shutdown */
@@ -136,39 +125,11 @@ public class WebRuntimeMonitor implements WebMonitor {
String fileName = String.format("flink-web-%s", UUID.randomUUID().toString());
webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName);
LOG.info("Using directory {} for the web interface files", webRootDir);
-
- // figure out where our logs are
- final String flinkRoot = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, null);
- final String defaultLogDirectory = flinkRoot + "/log";
- final String logDirectories = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, defaultLogDirectory);
-
- // find out which directory holds the path for log and stdout
- final ArrayList<String> logPaths = new ArrayList<>();
- final ArrayList<String> outPaths = new ArrayList<>();
-
- // yarn allows for multiple log directories. Search in all.
- for(String paths: logDirectories.split(",")) {
- File dir = new File(paths);
- if (dir.exists() && dir.isDirectory() && dir.canRead()) {
- if (dir.listFiles(LOG_FILE_PATTERN).length == 1) {
- logPaths.add(paths);
- }
- if (dir.listFiles(STDOUT_FILE_PATTERN).length == 1) {
- outPaths.add(paths);
- }
- }
- }
- // we don't want any ambiguities. There must be only one log and out file.
- if(logPaths.size() != 1 || outPaths.size() != 1) {
- throw new IllegalConfigurationException("The path to the log and out files (" +
- logDirectories + ") is not valid.");
- }
+ final WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(config);
- final File logDir = new File(logPaths.get(0));
- final File outDir = new File(outPaths.get(0));
- LOG.info("Serving job manager logs from {}", logDir.getAbsolutePath());
- LOG.info("Serving job manager stdout from {}", outDir.getAbsolutePath());
+ LOG.info("Serving job manager log from {}", logFiles.logFile.getAbsolutePath());
+ LOG.info("Serving job manager stdout from {}", logFiles.stdOutFile.getAbsolutePath());
// port configuration
this.configuredPort = cfg.getWebFrontendPort();
@@ -190,7 +151,7 @@ public class WebRuntimeMonitor implements WebMonitor {
// the overview - how many task managers, slots, free slots, ...
.GET("/overview", handler(new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT)))
- // job manager configuration, log and stdout
+ // job manager configuration
.GET("/jobmanager/config", handler(new JobManagerConfigHandler(config)))
// overview over jobs
@@ -220,8 +181,10 @@ public class WebRuntimeMonitor implements WebMonitor {
.GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
.GET("/taskmanagers/:" + TaskManagersHandler.TASK_MANAGER_ID_KEY, handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))
- .GET("/jobmanager/log", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logDir))
- .GET("/jobmanager/stdout", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, outDir))
+ // log and stdout
+ .GET("/jobmanager/log", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.logFile))
+ .GET("/jobmanager/stdout", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, logFiles.stdOutFile))
+
// this handler serves all the static contents
.GET("/:*", new StaticFileServerHandler(retriever, jobManagerAddressPromise.future(), timeout, webRootDir));
http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
index 02dd81e..df330fd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
@@ -45,7 +45,6 @@ import io.netty.handler.codec.http.router.KeepAliveWrite;
import io.netty.handler.codec.http.router.Routed;
import io.netty.util.CharsetUtil;
import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
import org.slf4j.Logger;
@@ -60,7 +59,6 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.io.FilenameFilter;
import java.io.RandomAccessFile;
import java.nio.file.Files;
import java.text.ParseException;
@@ -166,11 +164,9 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
requestPath = requestPath + "index.html";
}
- // in case the files being accessed are logs or stdout files, find appropriate paths.
- if (requestPath.equals("/jobmanager/log")) {
- requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.LOG_FILE_PATTERN);
- } else if (requestPath.equals("/jobmanager/stdout")) {
- requestPath = "/" + getFileName(rootPath, WebRuntimeMonitor.STDOUT_FILE_PATTERN);
+ // in case the files being accessed are logs or stdout files, find appropriate paths.
+ if (requestPath.equals("/jobmanager/log") || requestPath.equals("/jobmanager/stdout")) {
+ requestPath = "";
}
Option<Tuple2<ActorGateway, Integer>> jobManager = retriever.getJobManagerGatewayAndWebPort();
@@ -371,9 +367,4 @@ public class StaticFileServerHandler extends SimpleChannelInboundHandler<Routed>
String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType();
response.headers().set(CONTENT_TYPE, mimeFinal);
}
-
- private static String getFileName(File directory, FilenameFilter pattern) {
- File[] files = directory.listFiles(pattern);
- return files.length == 0 ? null : files[0].getName();
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 68b00dc..a3f152d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -47,6 +47,7 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.File;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
@@ -84,12 +85,12 @@ public class WebRuntimeMonitorITCase extends TestLogger {
ActorRef jmActor = flink.jobManagerActors().get().head();
File logDir = temporaryFolder.newFolder("log");
- Files.createFile(new File(logDir, "jobmanager.log").toPath());
+ Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());
Configuration monitorConfig = new Configuration();
monitorConfig.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
- monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath());
+ monitorConfig.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
// Needs to match the leader address from the leader retrieval service
String jobManagerAddress = AkkaUtils.getAkkaURL(jmActorSystem, jmActor);
@@ -149,11 +150,11 @@ public class WebRuntimeMonitorITCase extends TestLogger {
temporaryFolder.getRoot().getPath());
File logDir = temporaryFolder.newFolder();
- Files.createFile(new File(logDir, "jobmanager.log").toPath());
+ Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
- config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath());
+ config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
for (int i = 0; i < jobManagerSystem.length; i++) {
jobManagerSystem[i] = AkkaUtils.createActorSystem(new Configuration(),
@@ -289,12 +290,12 @@ public class WebRuntimeMonitorITCase extends TestLogger {
try (TestingServer zooKeeper = new TestingServer()) {
File logDir = temporaryFolder.newFolder();
- Files.createFile(new File(logDir, "jobmanager.log").toPath());
+ Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());
final Configuration config = new Configuration();
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
- config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.getAbsolutePath());
+ config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
config.setString(ConfigConstants.RECOVERY_MODE, "ZOOKEEPER");
config.setString(ConfigConstants.ZOOKEEPER_QUORUM_KEY, zooKeeper.getConnectString());
http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 4fca270..181d6d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -19,7 +19,9 @@
package org.apache.flink.runtime.webmonitor;
import akka.actor.ActorSystem;
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -33,6 +35,7 @@ import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
@@ -48,6 +51,66 @@ public final class WebMonitorUtils {
private static final Logger LOG = LoggerFactory.getLogger(WebMonitorUtils.class);
/**
+ * Singleton to hold the log and stdout file
+ */
+ public static class LogFiles {
+
+ private static LogFiles INSTANCE;
+
+ public final File logFile;
+ public final File stdOutFile;
+
+ private LogFiles(String logFile) {
+ this.logFile = checkFileLocation(logFile);
+ String stdOutFile = logFile.replaceFirst("\\.log$", ".out");
+ this.stdOutFile = checkFileLocation(stdOutFile);;
+ }
+
+ /**
+ * Verify log file location
+ * @param logFilePath Path to log file
+ * @return File or null if not a valid log file
+ */
+ private static File checkFileLocation (String logFilePath) {
+ File logFile = new File(logFilePath);
+ if (logFile.exists() && logFile.canRead()) {
+ return logFile;
+ } else {
+ throw new IllegalConfigurationException("Job manager log file was supposed to be at " +
+ logFile.getAbsolutePath() + " but it does not exist or is not readable.");
+ }
+ }
+
+ /**
+ * Finds the Flink log directory using log.file Java property that is set during startup.
+ */
+ public static LogFiles find(Configuration config) {
+ if (INSTANCE == null) {
+
+ /** Figure out log file location based on 'log.file' VM argument **/
+ final String logEnv = "log.file";
+ String logFilePath = System.getProperty(logEnv);
+
+ if (logFilePath == null) {
+ LOG.warn("Log file environment variable '{}' is not set.", logEnv);
+ logFilePath = config.getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
+ }
+
+ if (logFilePath == null) {
+ throw new IllegalConfigurationException("JobManager log file not found. " +
+ "Can't serve log files. Log file location couldn't be determined via the " +
+ logEnv + " environment variable or the config constant " +
+ ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY);
+ }
+
+ INSTANCE = new LogFiles(logFilePath);
+ }
+
+ return INSTANCE;
+ }
+ }
+
+ /**
* Starts the web runtime monitor. Because the actual implementation of the runtime monitor is
* in another project, we load the runtime monitor dynamically.
* <p/>
http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index ce3adce..b2bfd6b 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -59,6 +59,7 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -131,7 +132,7 @@ public class TestBaseUtils extends TestLogger {
logDir = File.createTempFile("TestBaseUtils-logdir", null);
Assert.assertTrue("Unable to delete temp file", logDir.delete());
Assert.assertTrue("Unable to create temp directory", logDir.mkdir());
- Files.createFile(new File(logDir, "jobmanager.log").toPath());
+ Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath());
Files.createFile(new File(logDir, "jobmanager.out").toPath());
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, TASK_MANAGER_MEMORY_SIZE);
@@ -141,7 +142,7 @@ public class TestBaseUtils extends TestLogger {
config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, DEFAULT_AKKA_STARTUP_TIMEOUT);
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081);
- config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDir.toString());
+ config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString());
ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(config, singleActorSystem, mode);
http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
index bea96d3..9c37a95 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/web/WebFrontendITCase.java
@@ -115,14 +115,13 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
@Test
public void getLogAndStdoutFiles() {
try {
- String logPath = cluster.configuration().getString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, null);
- Assert.assertNotNull(logPath);
+ WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find(cluster.configuration());
- FileUtils.writeStringToFile(new File(logPath, "jobmanager.log"), "job manager log");
+ FileUtils.writeStringToFile(logFiles.logFile, "job manager log");
String logs = getFromHTTP("http://localhost:" + port + "/jobmanager/log");
Assert.assertTrue(logs.contains("job manager log"));
- FileUtils.writeStringToFile(new File(logPath, "jobmanager.out"), "job manager out");
+ FileUtils.writeStringToFile(logFiles.stdOutFile, "job manager out");
logs = getFromHTTP("http://localhost:" + port + "/jobmanager/stdout");
Assert.assertTrue(logs.contains("job manager out"));
}catch(Throwable e) {
@@ -138,8 +137,7 @@ public class WebFrontendITCase extends MultipleProgramsTestBase {
JSONArray array = new JSONArray(config);
Map<String, String> conf = WebMonitorUtils.fromKeyValueJsonArray(array);
- Assert.assertEquals(logDir.toString(),
- conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY));
+ Assert.assertTrue(conf.get(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY).startsWith(logDir.toString()));
Assert.assertEquals(
cluster.configuration().getString("taskmanager.numberOfTaskSlots", null),
conf.get(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS));
http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
index f3892dd..73fc951 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala
@@ -93,8 +93,6 @@ abstract class ApplicationMasterBase {
val currDir = env.get(Environment.PWD.key())
require(currDir != null, "Current directory unknown.")
- val logDirs = env.get(Environment.LOG_DIRS.key())
-
val streamingMode = if(ApplicationMasterBase.hasStreamingMode(env)) {
log.info("Starting ApplicationMaster/JobManager in streaming mode")
StreamingMode.STREAMING
@@ -119,8 +117,7 @@ abstract class ApplicationMasterBase {
// if a web monitor shall be started, set the port to random binding
if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) >= 0) {
- config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
- config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0); // set port to 0.
+ config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0);
}
val (actorSystem, jmActor, archiveActor, webMonitor) =
@@ -147,7 +144,7 @@ abstract class ApplicationMasterBase {
// generate configuration file for TaskManagers
generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, akkaHostname,
- jobManagerPort, webServerPort, logDirs, slots, taskManagerCount,
+ jobManagerPort, webServerPort, slots, taskManagerCount,
dynamicPropertiesEncodedString)
val hadoopConfig = new YarnConfiguration();
@@ -184,7 +181,6 @@ abstract class ApplicationMasterBase {
ownHostname: String,
jobManagerPort: Int,
jobManagerWebPort: Int,
- logDirs: String,
slots: Int,
taskManagerCount: Int,
dynamicPropertiesEncodedString: String)
@@ -202,7 +198,6 @@ abstract class ApplicationMasterBase {
output.println(s"${ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY}: $ownHostname")
output.println(s"${ConfigConstants.JOB_MANAGER_IPC_PORT_KEY}: $jobManagerPort")
- output.println(s"${ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY}: $logDirs")
output.println(s"${ConfigConstants.JOB_MANAGER_WEB_PORT_KEY}: $jobManagerWebPort")
http://git-wip-us.apache.org/repos/asf/flink/blob/e7119697/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
index 4ada21e..c8a9480 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/YarnJobManager.scala
@@ -691,8 +691,8 @@ class YarnJobManager(
}
tmCommand ++= s" ${taskManagerRunnerClass.getName} --configDir . 1> " +
- s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stdout.log 2> " +
- s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager-stderr.log"
+ s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.out 2> " +
+ s"${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/taskmanager.err"
tmCommand ++= " --streamingMode"
if(streamingMode) {