You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/26 11:47:03 UTC
[38/53] [abbrv] git commit: [FLINK-973] [FLINK-969] Unify, clean up,
and extend all environment logging at JobManager/TaskManager startup
[FLINK-973] [FLINK-969] Unify, clean up, and extend all environment logging at JobManager/TaskManager startup
This closes #40.
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/28863ee0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/28863ee0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/28863ee0
Branch: refs/heads/travis_test
Commit: 28863ee089bd0f81d6541a631c2d8699eaa71471
Parents: d55ba70
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jun 24 18:32:25 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Jun 25 11:04:50 2014 +0200
----------------------------------------------------------------------
.../nephele/jobmanager/JobManager.java | 19 +--
.../nephele/jobmanager/JobManagerUtils.java | 48 +-----
.../jobmanager/web/JobmanagerInfoServlet.java | 6 +-
.../nephele/taskmanager/TaskManager.java | 25 +--
.../runtime/util/EnvironmentInformation.java | 157 +++++++++++++++++++
5 files changed, 170 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/28863ee0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
index 6401407..877288c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
@@ -79,7 +79,6 @@ import eu.stratosphere.nephele.ipc.Server;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.jobmanager.JobManagerUtils.RevisionInformation;
import eu.stratosphere.nephele.jobmanager.accumulators.AccumulatorManager;
import eu.stratosphere.nephele.jobmanager.archive.ArchiveListener;
import eu.stratosphere.nephele.jobmanager.archive.MemoryArchivist;
@@ -103,6 +102,7 @@ import eu.stratosphere.nephele.taskmanager.TaskExecutionState;
import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse;
import eu.stratosphere.runtime.io.network.RemoteReceiver;
+import eu.stratosphere.runtime.util.EnvironmentInformation;
import eu.stratosphere.nephele.taskmanager.ExecutorThreadFactory;
import eu.stratosphere.nephele.taskmanager.transferenvelope.RegisterTaskManagerResult;
import eu.stratosphere.nephele.topology.NetworkTopology;
@@ -287,17 +287,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
this.isShutDown = true;
LOG.debug("Shutdown of job manager completed");
}
-
- /**
- * Log Stratosphere version information.
- */
- private static void logVersionInformation() {
- RevisionInformation rev = JobManagerUtils.getRevisionInformation();
- LOG.info("Starting Stratosphere JobManager "
- + "(Version: " + JobManagerUtils.getVersion() + ", "
- + "Rev:" + rev.commitId + ", "
- + "Date:" + rev.commitDate + ")");
- }
/**
* Entry point for the program
@@ -340,9 +329,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
@SuppressWarnings("static-access")
public static JobManager initialize(String[] args) throws Exception {
- // output the version and revision information to the log
- logVersionInformation();
-
final Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg()
.withDescription("Specify configuration directory.").create("configDir");
@@ -375,6 +361,9 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
System.exit(FAILURE_RETURN_CODE);
}
+ // print some startup environment info, like user, code revision, etc
+ EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager");
+
// First, try to load global configuration
GlobalConfiguration.loadConfiguration(configDir);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/28863ee0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
index f2e1d33..31879b8 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
@@ -13,18 +13,15 @@
package eu.stratosphere.nephele.jobmanager;
-import java.io.IOException;
-import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
-import java.util.Properties;
-import eu.stratosphere.nephele.ExecutionMode;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import eu.stratosphere.nephele.ExecutionMode;
+import eu.stratosphere.nephele.instance.InstanceManager;
+import eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler;
import eu.stratosphere.util.StringUtils;
/**
@@ -167,43 +164,4 @@ public class JobManagerUtils {
throw new RuntimeException("Unrecognized Execution Mode.");
}
}
-
- /**
- * Returns the version of Stratosphere as String.
- * If version == null, then the JobManager runs from inside the IDE (or somehow not from the maven build jar)
- * @return String
- */
- public static String getVersion() {
- String version = JobManagerUtils.class.getPackage().getImplementationVersion();
- return version;
- }
-
- /**
- * Returns the revision of Stratosphere as String.
- * @return String
- */
- public static RevisionInformation getRevisionInformation() {
- RevisionInformation info = new RevisionInformation();
- String revision = "<unknown>";
- String commitDate = "<unknown>";
- try {
- Properties properties = new Properties();
- InputStream propFile = JobManagerUtils.class.getClassLoader().getResourceAsStream(".version.properties");
- if (propFile != null) {
- properties.load(propFile);
- revision = properties.getProperty("git.commit.id.abbrev");
- commitDate = properties.getProperty("git.commit.time");
- }
- } catch (IOException e) {
- LOG.info("Cannot determine code revision. Unable ro read version property file.");
- }
- info.commitId = revision;
- info.commitDate = commitDate;
- return info;
- }
-
- public static class RevisionInformation {
- public String commitId;
- public String commitDate;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/28863ee0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java
index 8ffbd57..4b23399 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java
@@ -40,13 +40,13 @@ import eu.stratosphere.nephele.execution.ExecutionState;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.jobgraph.JobStatus;
import eu.stratosphere.nephele.jobmanager.JobManager;
-import eu.stratosphere.nephele.jobmanager.JobManagerUtils;
import eu.stratosphere.nephele.managementgraph.ManagementGraph;
import eu.stratosphere.nephele.managementgraph.ManagementGraphIterator;
import eu.stratosphere.nephele.managementgraph.ManagementGroupVertex;
import eu.stratosphere.nephele.managementgraph.ManagementGroupVertexID;
import eu.stratosphere.nephele.managementgraph.ManagementVertex;
import eu.stratosphere.nephele.services.accumulators.AccumulatorEvent;
+import eu.stratosphere.runtime.util.EnvironmentInformation;
import eu.stratosphere.util.StringUtils;
@@ -516,8 +516,8 @@ public class JobmanagerInfoServlet extends HttpServlet {
*/
private void writeJsonForVersion(PrintWriter wrt) {
wrt.write("{");
- wrt.write("\"version\": \"" + JobManagerUtils.getVersion() + "\",");
- wrt.write("\"revision\": \"" + JobManagerUtils.getRevisionInformation().commitId + "\"");
+ wrt.write("\"version\": \"" + EnvironmentInformation.getVersion() + "\",");
+ wrt.write("\"revision\": \"" + EnvironmentInformation.getRevisionInformation().commitId + "\"");
wrt.write("}");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/28863ee0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index d866c64..bedafaf 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -58,7 +58,6 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
import eu.stratosphere.api.common.cache.DistributedCache;
import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
@@ -80,8 +79,6 @@ import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.ipc.Server;
import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.jobmanager.JobManagerUtils;
-import eu.stratosphere.nephele.jobmanager.JobManagerUtils.RevisionInformation;
import eu.stratosphere.nephele.net.NetUtils;
import eu.stratosphere.nephele.profiling.ProfilingUtils;
import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
@@ -98,6 +95,7 @@ import eu.stratosphere.pact.runtime.cache.FileCache;
import eu.stratosphere.runtime.io.channels.ChannelID;
import eu.stratosphere.runtime.io.network.ChannelManager;
import eu.stratosphere.runtime.io.network.InsufficientResourcesException;
+import eu.stratosphere.runtime.util.EnvironmentInformation;
import eu.stratosphere.util.StringUtils;
/**
@@ -173,18 +171,6 @@ public class TaskManager implements TaskOperationProtocol {
if (executionMode == null) {
throw new NullPointerException("Execution mode must not be null.");
}
-
- RevisionInformation rev = JobManagerUtils.getRevisionInformation();
- LOG.info("Starting Stratosphere TaskManager "
- + "(Version: " + JobManagerUtils.getVersion() + ", "
- + "Rev:" + rev.commitId + ", "
- + "Date:" + rev.commitDate + ")");
-
- try {
- LOG.info("TaskManager started as user " + UserGroupInformation.getCurrentUser().getShortUserName());
- } catch (Throwable t) {
- LOG.error("Cannot determine user group information.", t);
- }
LOG.info("Execution mode: " + executionMode);
@@ -513,14 +499,9 @@ public class TaskManager implements TaskOperationProtocol {
LOG.info("Setting temporary directory to "+tempDirVal);
GlobalConfiguration.includeConfiguration(c);
}
- System.err.println("Configuration "+GlobalConfiguration.getConfiguration());
- LOG.info("Current user "+UserGroupInformation.getCurrentUser().getShortUserName());
- {
- // log the available JVM memory
- long maxMemoryMiBytes = Runtime.getRuntime().maxMemory() >>> 20;
- LOG.info("Starting TaskManager in a JVM with " + maxMemoryMiBytes + " MiBytes maximum heap size.");
- }
+ // print some startup environment info, like user, code revision, etc
+ EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager");
// Create a new task manager object
try {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/28863ee0/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/util/EnvironmentInformation.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/util/EnvironmentInformation.java
new file mode 100644
index 0000000..29d0804
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/util/EnvironmentInformation.java
@@ -0,0 +1,157 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.util;
+
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class EnvironmentInformation {
+
+ private static final Log LOG = LogFactory.getLog(EnvironmentInformation.class);
+
+ private static final String UNKNOWN = "<unknown>";
+
+ private static final String LOG_FILE_OPTION = "-Dlog.file";
+
+ private static final String LOG_CONFIGURAION_OPTION = "-Dlog4j.configuration";
+
+ /**
+ * Returns the version of the code as String. If version == null, then the JobManager does not run from a
+ * maven build. An example is a source code checkout, compile, and run from inside an IDE.
+ *
+ * @return The version string.
+ */
+ public static String getVersion() {
+ return EnvironmentInformation.class.getPackage().getImplementationVersion();
+ }
+
+ /**
+ * Returns the code revision (commit and commit date) of Stratosphere.
+ *
+ * @return The code revision.
+ */
+ public static RevisionInformation getRevisionInformation() {
+ RevisionInformation info = new RevisionInformation();
+ String revision = UNKNOWN;
+ String commitDate = UNKNOWN;
+ try {
+ Properties properties = new Properties();
+ InputStream propFile = EnvironmentInformation.class.getClassLoader().getResourceAsStream(".version.properties");
+ if (propFile != null) {
+ properties.load(propFile);
+ revision = properties.getProperty("git.commit.id.abbrev");
+ commitDate = properties.getProperty("git.commit.time");
+ }
+ } catch (Throwable t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot determine code revision: Unable ro read version property file.", t);
+ } else {
+ LOG.info("Cannot determine code revision: Unable ro read version property file.");
+ }
+ }
+ info.commitId = revision;
+ info.commitDate = commitDate;
+ return info;
+ }
+
+ public static class RevisionInformation {
+ public String commitId;
+ public String commitDate;
+ }
+
+ public static String getUserRunning() {
+ try {
+ return UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (Throwable t) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cannot determine user/group information for the current user.", t);
+ } else {
+ LOG.info("Cannot determine user/group information for the current user.");
+ }
+ return UNKNOWN;
+ }
+ }
+
+ public static long getMaxJvmMemory() {
+ return Runtime.getRuntime().maxMemory() >>> 20;
+ }
+
+ public static String getJvmVersion() {
+ try {
+ final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
+ return bean.getVmName() + " - " + bean.getVmVendor() + " - " + bean.getSpecVersion() + '/' + bean.getVmVersion();
+ }
+ catch (Throwable t) {
+ return UNKNOWN;
+ }
+ }
+
+ public static String getJvmStartupOptions() {
+ try {
+ final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
+ final StringBuilder bld = new StringBuilder();
+ for (String s : bean.getInputArguments()) {
+ if (!s.startsWith(LOG_FILE_OPTION) && !s.startsWith(LOG_CONFIGURAION_OPTION)) {
+ bld.append(s).append(' ');
+ }
+ }
+ return bld.toString();
+ }
+ catch (Throwable t) {
+ return UNKNOWN;
+ }
+ }
+
+ public static void logEnvironmentInfo(Log log, String componentName) {
+ if (log.isInfoEnabled()) {
+ RevisionInformation rev = getRevisionInformation();
+ String version = getVersion();
+
+ String user = getUserRunning();
+
+ String jvmVersion = getJvmVersion();
+ String options = getJvmStartupOptions();
+
+ String javaHome = System.getenv("JAVA_HOME");
+
+ long memory = getMaxJvmMemory();
+
+ log.info("-------------------------------------------------------");
+ log.info(" Starting " + componentName + " (Version: " + version + ", "
+ + "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")");
+ log.info(" Current user: " + user);
+ log.info(" JVM: " + jvmVersion);
+ log.info(" Startup Options: " + options);
+ log.info(" Maximum heap size: " + memory + " MiBytes");
+ log.info(" JAVA_HOME: " + (javaHome == null ? "not set" : javaHome));
+ log.info("-------------------------------------------------------");
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private EnvironmentInformation() {}
+
+ public static void main(String[] args) {
+ logEnvironmentInfo(LOG, "Test");
+ }
+}