You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/08 20:31:11 UTC
[4/9] flink git commit: [FLINK-2208] [runtime] Fix reflective access
to CPU load to support IBM Java
[FLINK-2208] [runtime] Fix reflective access to CPU load to support IBM Java
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60cfa0b2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60cfa0b2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60cfa0b2
Branch: refs/heads/master
Commit: 60cfa0b2dcb4dae1637d867c4c63c67b3d26adf9
Parents: 8acc0d2
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 8 11:58:00 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200
----------------------------------------------------------------------
.../flink/runtime/taskmanager/TaskManager.scala | 47 +++++++++-----------
1 file changed, 21 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/60cfa0b2/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 612d5c0..1c60e89 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -22,7 +22,7 @@ import java.io.{File, IOException}
import java.net.{InetAddress, InetSocketAddress}
import java.util.concurrent.TimeUnit
import java.lang.reflect.Method
-import java.lang.management.ManagementFactory
+import java.lang.management.{OperatingSystemMXBean, ManagementFactory}
import akka.actor._
import akka.pattern.ask
@@ -35,7 +35,7 @@ import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
import com.fasterxml.jackson.databind.ObjectMapper
import grizzled.slf4j.Logger
-import org.apache.flink.configuration._
+import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException}
import org.apache.flink.runtime.messages.checkpoint.{ConfirmCheckpoint, TriggerCheckpoint, AbstractCheckpointMessage}
import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
import org.apache.flink.runtime.akka.AkkaUtils
@@ -1752,40 +1752,35 @@ object TaskManager {
ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()
})
- // Preprocessing steps for registering cpuLoad
- val fetchCPULoad = getMethodToFetchCPULoad()
-
- // Log getProcessCpuLoad unavailable for Java 6
- if(fetchCPULoad.isEmpty){
- LOG.warn("getProcessCpuLoad method not available in the Operating System Bean" +
- "implementation for this Java runtime environment\n" +
- Thread.currentThread().getStackTrace)
- }
+ // Pre-processing steps for registering cpuLoad
+ val osBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean()
+
+ val fetchCPULoadMethod: Option[Method] =
+ try {
+ Class.forName("com.sun.management.OperatingSystemMXBean")
+ .getMethods()
+ .find( _.getName() == "getProcessCpuLoad" )
+ }
+ catch {
+ case t: Throwable =>
+ LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
+ " - CPU load metrics will not be available.")
+ None
+ }
metricRegistry.register("cpuLoad", new Gauge[Double] {
override def getValue: Double = {
try{
- fetchCPULoad.map(_.invoke(ManagementFactory.getOperatingSystemMXBean().
- asInstanceOf[com.sun.management.OperatingSystemMXBean]).
- asInstanceOf[Double]).getOrElse(-1)
- } catch {
+ fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
+ }
+ catch {
case t: Throwable => {
LOG.warn("Error retrieving CPU Load through OperatingSystemMXBean", t)
- -1
+ -1.0
}
}
}
})
metricRegistry
}
-
- /**
- * Fetches getProcessCpuLoad method if available in the
- * OperatingSystemMXBean implementation else returns None
- * @return
- */
- private def getMethodToFetchCPULoad(): Option[Method] = {
- val methodsList = classOf[com.sun.management.OperatingSystemMXBean].getMethods()
- methodsList.filter(_.getName == "getProcessCpuLoad").headOption
- }
}