You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/06/26 11:47:03 UTC

[38/53] [abbrv] git commit: [FLINK-973] [FLINK-969] Unify, clean up, and extend all environment logging at JobManager/TaskManager startup

[FLINK-973] [FLINK-969] Unify, clean up, and extend all environment logging at JobManager/TaskManager startup

This closes #40.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/28863ee0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/28863ee0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/28863ee0

Branch: refs/heads/travis_test
Commit: 28863ee089bd0f81d6541a631c2d8699eaa71471
Parents: d55ba70
Author: Stephan Ewen <se...@apache.org>
Authored: Tue Jun 24 18:32:25 2014 +0200
Committer: uce <u....@fu-berlin.de>
Committed: Wed Jun 25 11:04:50 2014 +0200

----------------------------------------------------------------------
 .../nephele/jobmanager/JobManager.java          |  19 +--
 .../nephele/jobmanager/JobManagerUtils.java     |  48 +-----
 .../jobmanager/web/JobmanagerInfoServlet.java   |   6 +-
 .../nephele/taskmanager/TaskManager.java        |  25 +--
 .../runtime/util/EnvironmentInformation.java    | 157 +++++++++++++++++++
 5 files changed, 170 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/28863ee0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
index 6401407..877288c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManager.java
@@ -79,7 +79,6 @@ import eu.stratosphere.nephele.ipc.Server;
 import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
 import eu.stratosphere.nephele.jobgraph.JobGraph;
 import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.jobmanager.JobManagerUtils.RevisionInformation;
 import eu.stratosphere.nephele.jobmanager.accumulators.AccumulatorManager;
 import eu.stratosphere.nephele.jobmanager.archive.ArchiveListener;
 import eu.stratosphere.nephele.jobmanager.archive.MemoryArchivist;
@@ -103,6 +102,7 @@ import eu.stratosphere.nephele.taskmanager.TaskExecutionState;
 import eu.stratosphere.nephele.taskmanager.TaskSubmissionResult;
 import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse;
 import eu.stratosphere.runtime.io.network.RemoteReceiver;
+import eu.stratosphere.runtime.util.EnvironmentInformation;
 import eu.stratosphere.nephele.taskmanager.ExecutorThreadFactory;
 import eu.stratosphere.nephele.taskmanager.transferenvelope.RegisterTaskManagerResult;
 import eu.stratosphere.nephele.topology.NetworkTopology;
@@ -287,17 +287,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		this.isShutDown = true;
 		LOG.debug("Shutdown of job manager completed");
 	}
-
-	/**
-	 * Log Stratosphere version information.
-	 */
-	private static void logVersionInformation() {
-		RevisionInformation rev = JobManagerUtils.getRevisionInformation();
-		LOG.info("Starting Stratosphere JobManager "
-				+ "(Version: " + JobManagerUtils.getVersion() + ", "
-					+ "Rev:" + rev.commitId + ", "
-					+ "Date:" + rev.commitDate + ")");
-	}
 	
 	/**
 	 * Entry point for the program
@@ -340,9 +329,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 	
 	@SuppressWarnings("static-access")
 	public static JobManager initialize(String[] args) throws Exception {
-		// output the version and revision information to the log
-		logVersionInformation();
-		
 		final Option configDirOpt = OptionBuilder.withArgName("config directory").hasArg()
 			.withDescription("Specify configuration directory.").create("configDir");
 
@@ -375,6 +361,9 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			System.exit(FAILURE_RETURN_CODE);
 		}
 		
+		// print some startup environment info, like user, code revision, etc
+		EnvironmentInformation.logEnvironmentInfo(LOG, "JobManager");
+		
 		// First, try to load global configuration
 		GlobalConfiguration.loadConfiguration(configDir);
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/28863ee0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
index f2e1d33..31879b8 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/JobManagerUtils.java
@@ -13,18 +13,15 @@
 
 package eu.stratosphere.nephele.jobmanager;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.util.Properties;
 
-import eu.stratosphere.nephele.ExecutionMode;
-import eu.stratosphere.nephele.instance.InstanceManager;
-import eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import eu.stratosphere.nephele.ExecutionMode;
+import eu.stratosphere.nephele.instance.InstanceManager;
+import eu.stratosphere.nephele.jobmanager.scheduler.DefaultScheduler;
 import eu.stratosphere.util.StringUtils;
 
 /**
@@ -167,43 +164,4 @@ public class JobManagerUtils {
 			throw new RuntimeException("Unrecognized Execution Mode.");
 		}
 	}
-	
-	/**
-	 * Returns the version of Stratosphere as String.
-	 * If version == null, then the JobManager runs from inside the IDE (or somehow not from the maven build jar)
-	 * @return String
-	 */
-	public static String getVersion() {
-		String version = JobManagerUtils.class.getPackage().getImplementationVersion();
-		return version;
-	}
-
-	/**
-	 * Returns the revision of Stratosphere as String.
-	 * @return String
-	 */
-	public static RevisionInformation getRevisionInformation() {
-		RevisionInformation info = new RevisionInformation();
-		String revision = "<unknown>";
-		String commitDate = "<unknown>";
-		try {
-			Properties properties = new Properties();
-			InputStream propFile = JobManagerUtils.class.getClassLoader().getResourceAsStream(".version.properties");
-			if (propFile != null) {
-				properties.load(propFile);
-				revision = properties.getProperty("git.commit.id.abbrev");
-				commitDate = properties.getProperty("git.commit.time");
-			}
-		} catch (IOException e) {
-			LOG.info("Cannot determine code revision. Unable ro read version property file.");
-		}
-		info.commitId = revision;
-		info.commitDate = commitDate;
-		return info;
-	}
-	
-	public static class RevisionInformation {
-		public String commitId;
-		public String commitDate;
-	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/28863ee0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java
index 8ffbd57..4b23399 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/jobmanager/web/JobmanagerInfoServlet.java
@@ -40,13 +40,13 @@ import eu.stratosphere.nephele.execution.ExecutionState;
 import eu.stratosphere.nephele.jobgraph.JobID;
 import eu.stratosphere.nephele.jobgraph.JobStatus;
 import eu.stratosphere.nephele.jobmanager.JobManager;
-import eu.stratosphere.nephele.jobmanager.JobManagerUtils;
 import eu.stratosphere.nephele.managementgraph.ManagementGraph;
 import eu.stratosphere.nephele.managementgraph.ManagementGraphIterator;
 import eu.stratosphere.nephele.managementgraph.ManagementGroupVertex;
 import eu.stratosphere.nephele.managementgraph.ManagementGroupVertexID;
 import eu.stratosphere.nephele.managementgraph.ManagementVertex;
 import eu.stratosphere.nephele.services.accumulators.AccumulatorEvent;
+import eu.stratosphere.runtime.util.EnvironmentInformation;
 import eu.stratosphere.util.StringUtils;
 
 
@@ -516,8 +516,8 @@ public class JobmanagerInfoServlet extends HttpServlet {
 	 */
 	private void writeJsonForVersion(PrintWriter wrt) {
 		wrt.write("{");
-		wrt.write("\"version\": \"" + JobManagerUtils.getVersion() + "\",");
-		wrt.write("\"revision\": \"" + JobManagerUtils.getRevisionInformation().commitId + "\"");
+		wrt.write("\"version\": \"" + EnvironmentInformation.getVersion() + "\",");
+		wrt.write("\"revision\": \"" + EnvironmentInformation.getRevisionInformation().commitId + "\"");
 		wrt.write("}");
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/28863ee0/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
index d866c64..bedafaf 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java
@@ -58,7 +58,6 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.security.UserGroupInformation;
 
 import eu.stratosphere.api.common.cache.DistributedCache;
 import eu.stratosphere.api.common.cache.DistributedCache.DistributedCacheEntry;
@@ -80,8 +79,6 @@ import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
 import eu.stratosphere.nephele.ipc.RPC;
 import eu.stratosphere.nephele.ipc.Server;
 import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.jobmanager.JobManagerUtils;
-import eu.stratosphere.nephele.jobmanager.JobManagerUtils.RevisionInformation;
 import eu.stratosphere.nephele.net.NetUtils;
 import eu.stratosphere.nephele.profiling.ProfilingUtils;
 import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
@@ -98,6 +95,7 @@ import eu.stratosphere.pact.runtime.cache.FileCache;
 import eu.stratosphere.runtime.io.channels.ChannelID;
 import eu.stratosphere.runtime.io.network.ChannelManager;
 import eu.stratosphere.runtime.io.network.InsufficientResourcesException;
+import eu.stratosphere.runtime.util.EnvironmentInformation;
 import eu.stratosphere.util.StringUtils;
 
 /**
@@ -173,18 +171,6 @@ public class TaskManager implements TaskOperationProtocol {
 		if (executionMode == null) {
 			throw new NullPointerException("Execution mode must not be null.");
 		}
-		
-		RevisionInformation rev = JobManagerUtils.getRevisionInformation();
-		LOG.info("Starting Stratosphere TaskManager "
-				+ "(Version: " + JobManagerUtils.getVersion() + ", "
-					+ "Rev:" + rev.commitId + ", "
-					+ "Date:" + rev.commitDate + ")");
-		
-		try {
-			LOG.info("TaskManager started as user " + UserGroupInformation.getCurrentUser().getShortUserName());
-		} catch (Throwable t) {
-			LOG.error("Cannot determine user group information.", t);
-		}
 			
 		LOG.info("Execution mode: " + executionMode);
 
@@ -513,14 +499,9 @@ public class TaskManager implements TaskOperationProtocol {
 			LOG.info("Setting temporary directory to "+tempDirVal);
 			GlobalConfiguration.includeConfiguration(c);
 		}
-		System.err.println("Configuration "+GlobalConfiguration.getConfiguration());
-		LOG.info("Current user "+UserGroupInformation.getCurrentUser().getShortUserName());
 		
-		{
-			// log the available JVM memory
-			long maxMemoryMiBytes = Runtime.getRuntime().maxMemory() >>> 20;
-			LOG.info("Starting TaskManager in a JVM with " + maxMemoryMiBytes + " MiBytes maximum heap size.");
-		}
+		// print some startup environment info, like user, code revision, etc
+		EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager");
 		
 		// Create a new task manager object
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/28863ee0/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/util/EnvironmentInformation.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/util/EnvironmentInformation.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/util/EnvironmentInformation.java
new file mode 100644
index 0000000..29d0804
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/util/EnvironmentInformation.java
@@ -0,0 +1,157 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.util;
+
+import java.io.InputStream;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
+
+public class EnvironmentInformation {
+
+	private static final Log LOG = LogFactory.getLog(EnvironmentInformation.class);
+
+	private static final String UNKNOWN = "<unknown>";
+
+	private static final String LOG_FILE_OPTION = "-Dlog.file";
+
+	private static final String LOG_CONFIGURAION_OPTION = "-Dlog4j.configuration";
+
+	/**
+	 * Returns the version of the code as String. If version == null, then the JobManager does not run from a
+	 * maven build. An example is a source code checkout, compile, and run from inside an IDE.
+	 * 
+	 * @return The version string.
+	 */
+	public static String getVersion() {
+		return EnvironmentInformation.class.getPackage().getImplementationVersion();
+	}
+
+	/**
+	 * Returns the code revision (commit and commit date) of Stratosphere.
+	 * 
+	 * @return The code revision.
+	 */
+	public static RevisionInformation getRevisionInformation() {
+		RevisionInformation info = new RevisionInformation();
+		String revision = UNKNOWN;
+		String commitDate = UNKNOWN;
+		try {
+			Properties properties = new Properties();
+			InputStream propFile = EnvironmentInformation.class.getClassLoader().getResourceAsStream(".version.properties");
+			if (propFile != null) {
+				properties.load(propFile);
+				revision = properties.getProperty("git.commit.id.abbrev");
+				commitDate = properties.getProperty("git.commit.time");
+			}
+		} catch (Throwable t) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Cannot determine code revision: Unable ro read version property file.", t);
+			} else {
+				LOG.info("Cannot determine code revision: Unable ro read version property file.");
+			}
+		}
+		info.commitId = revision;
+		info.commitDate = commitDate;
+		return info;
+	}
+
+	public static class RevisionInformation {
+		public String commitId;
+		public String commitDate;
+	}
+
+	public static String getUserRunning() {
+		try {
+			return UserGroupInformation.getCurrentUser().getShortUserName();
+		} catch (Throwable t) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Cannot determine user/group information for the current user.", t);
+			} else {
+				LOG.info("Cannot determine user/group information for the current user.");
+			}
+			return UNKNOWN;
+		}
+	}
+
+	public static long getMaxJvmMemory() {
+		return Runtime.getRuntime().maxMemory() >>> 20;
+	}
+
+	public static String getJvmVersion() {
+		try {
+			final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
+			return bean.getVmName() + " - " + bean.getVmVendor() + " - " + bean.getSpecVersion() + '/' + bean.getVmVersion();
+		}
+		catch (Throwable t) {
+			return UNKNOWN;
+		}
+	}
+
+	public static String getJvmStartupOptions() {
+		try {
+			final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
+			final StringBuilder bld = new StringBuilder();
+			for (String s : bean.getInputArguments()) {
+				if (!s.startsWith(LOG_FILE_OPTION) && !s.startsWith(LOG_CONFIGURAION_OPTION)) {
+					bld.append(s).append(' ');
+				}
+			}
+			return bld.toString();
+		}
+		catch (Throwable t) {
+			return UNKNOWN;
+		}
+	}
+
+	public static void logEnvironmentInfo(Log log, String componentName) {
+		if (log.isInfoEnabled()) {
+			RevisionInformation rev = getRevisionInformation();
+			String version = getVersion();
+			
+			String user = getUserRunning();
+			
+			String jvmVersion = getJvmVersion();
+			String options = getJvmStartupOptions();
+			
+			String javaHome = System.getenv("JAVA_HOME");
+			
+			long memory = getMaxJvmMemory();
+			
+			log.info("-------------------------------------------------------");
+			log.info(" Starting " + componentName + " (Version: " + version + ", "
+					+ "Rev:" + rev.commitId + ", " + "Date:" + rev.commitDate + ")");
+			log.info(" Current user: " + user);
+			log.info(" JVM: " + jvmVersion);
+			log.info(" Startup Options: " + options);
+			log.info(" Maximum heap size: " + memory + " MiBytes");
+			log.info(" JAVA_HOME: " + (javaHome == null ? "not set" : javaHome));
+			log.info("-------------------------------------------------------");
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	private EnvironmentInformation() {}
+	
+	public static void main(String[] args) {
+		logEnvironmentInfo(LOG, "Test");
+	}
+}