You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/08 20:31:11 UTC

[4/9] flink git commit: [FLINK-2208] [runtime] Fix reflective access to CPU load to support IBM Java

[FLINK-2208] [runtime] Fix reflective access to CPU load to support IBM Java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60cfa0b2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60cfa0b2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60cfa0b2

Branch: refs/heads/master
Commit: 60cfa0b2dcb4dae1637d867c4c63c67b3d26adf9
Parents: 8acc0d2
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 8 11:58:00 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 8 20:28:40 2015 +0200

----------------------------------------------------------------------
 .../flink/runtime/taskmanager/TaskManager.scala | 47 +++++++++-----------
 1 file changed, 21 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/60cfa0b2/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 612d5c0..1c60e89 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -22,7 +22,7 @@ import java.io.{File, IOException}
 import java.net.{InetAddress, InetSocketAddress}
 import java.util.concurrent.TimeUnit
 import java.lang.reflect.Method
-import java.lang.management.ManagementFactory
+import java.lang.management.{OperatingSystemMXBean, ManagementFactory}
 
 import akka.actor._
 import akka.pattern.ask
@@ -35,7 +35,7 @@ import com.codahale.metrics.jvm.{MemoryUsageGaugeSet, GarbageCollectorMetricSet}
 import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 
-import org.apache.flink.configuration._
+import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException}
 import org.apache.flink.runtime.messages.checkpoint.{ConfirmCheckpoint, TriggerCheckpoint, AbstractCheckpointMessage}
 import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
@@ -1752,40 +1752,35 @@ object TaskManager {
         ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage()
     })
 
-    // Preprocessing steps for registering cpuLoad
-    val fetchCPULoad = getMethodToFetchCPULoad()
-
-    // Log getProcessCpuLoad unavailable for Java 6
-    if(fetchCPULoad.isEmpty){
-      LOG.warn("getProcessCpuLoad method not available in the Operating System Bean" +
-        "implementation for this Java runtime environment\n" +
-        Thread.currentThread().getStackTrace)
-    }
+    // Pre-processing steps for registering cpuLoad
+    val osBean: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean()
+        
+    val fetchCPULoadMethod: Option[Method] = 
+      try {
+        Class.forName("com.sun.management.OperatingSystemMXBean")
+          .getMethods()
+          .find( _.getName() == "getProcessCpuLoad" )
+      }
+      catch {
+        case t: Throwable =>
+          LOG.warn("Cannot access com.sun.management.OperatingSystemMXBean.getProcessCpuLoad()" +
+            " - CPU load metrics will not be available.")
+          None
+      }
 
     metricRegistry.register("cpuLoad", new Gauge[Double] {
       override def getValue: Double = {
         try{
-          fetchCPULoad.map(_.invoke(ManagementFactory.getOperatingSystemMXBean().
-            asInstanceOf[com.sun.management.OperatingSystemMXBean]).
-            asInstanceOf[Double]).getOrElse(-1)
-        } catch {
+          fetchCPULoadMethod.map(_.invoke(osBean).asInstanceOf[Double]).getOrElse(-1.0)
+        }
+        catch {
           case t: Throwable => {
             LOG.warn("Error retrieving CPU Load through OperatingSystemMXBean", t)
-            -1
+            -1.0
           }
         }
       }
     })
     metricRegistry
   }
-
-  /**
-   * Fetches getProcessCpuLoad method if available in the
-   *  OperatingSystemMXBean implementation else returns None
-   * @return
-   */
-  private def getMethodToFetchCPULoad(): Option[Method] = {
-    val methodsList = classOf[com.sun.management.OperatingSystemMXBean].getMethods()
-    methodsList.filter(_.getName == "getProcessCpuLoad").headOption
-  }
 }