You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by squito <gi...@git.apache.org> on 2018/12/03 17:54:19 UTC
[GitHub] spark pull request #22612: [SPARK-24958] Add executors' process tree total m...
Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/22612#discussion_r238366634
--- Diff: core/src/main/scala/org/apache/spark/executor/ProcfsMetricsGetter.scala ---
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io._
+import java.nio.charset.Charset
+import java.nio.file.{Files, Paths}
+import java.util.Locale
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Try
+
+import org.apache.spark.{SparkEnv, SparkException}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.util.Utils
+
+
+private[spark] case class ProcfsMetrics(
+ jvmVmemTotal: Long,
+ jvmRSSTotal: Long,
+ pythonVmemTotal: Long,
+ pythonRSSTotal: Long,
+ otherVmemTotal: Long,
+ otherRSSTotal: Long)
+
+// Some of the ideas here are taken from the ProcfsBasedProcessTree class in hadoop
+// project.
+private[spark] class ProcfsMetricsGetter(val procfsDir: String = "/proc/") extends Logging {
+ val procfsStatFile = "stat"
+ val testing = sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing")
+ val pageSize = computePageSize()
+ var isAvailable: Boolean = isProcfsAvailable
+ private val pid = computePid()
+
+ private lazy val isProcfsAvailable: Boolean = {
+ if (testing) {
+ true
+ }
+ else {
+ val procDirExists = Try(Files.exists(Paths.get(procfsDir))).recover {
+ case ioe: IOException =>
+ logWarning("Exception checking for procfs dir", ioe)
+ false
+ }
+ val shouldLogStageExecutorMetrics =
+ SparkEnv.get.conf.get(config.EVENT_LOG_STAGE_EXECUTOR_METRICS)
+ val shouldLogStageExecutorProcessTreeMetrics =
+ SparkEnv.get.conf.get(config.EVENT_LOG_PROCESS_TREE_METRICS)
+ procDirExists.get && shouldLogStageExecutorProcessTreeMetrics && shouldLogStageExecutorMetrics
+ }
+ }
+
+ private def computePid(): Int = {
+ if (!isAvailable || testing) {
+ return -1;
+ }
+ try {
+ // This can be simplified in java9:
+ // https://docs.oracle.com/javase/9/docs/api/java/lang/ProcessHandle.html
+ val cmd = Array("bash", "-c", "echo $PPID")
+ val out = Utils.executeAndGetOutput(cmd)
+ val pid = Integer.parseInt(out.split("\n")(0))
+ return pid;
+ }
+ catch {
+ case e: SparkException =>
+ logWarning("Exception when trying to compute process tree." +
+ " As a result reporting of ProcessTree metrics is stopped", e)
+ isAvailable = false
+ -1
+ }
+ }
+
+ private def computePageSize(): Long = {
+ if (testing) {
+ return 4096;
+ }
+ try {
+ val cmd = Array("getconf", "PAGESIZE")
+ val out = Utils.executeAndGetOutput(cmd)
+ Integer.parseInt(out.split("\n")(0))
+ } catch {
+ case e: Exception =>
+ logWarning("Exception when trying to compute pagesize, as a" +
+ " result reporting of ProcessTree metrics is stopped")
+ isAvailable = false
+ 0
+ }
+ }
+
+ private def computeProcessTree(): Set[Int] = {
+ if (!isAvailable || testing) {
+ return Set()
+ }
+ var ptree: Set[Int] = Set()
+ ptree += pid
+ val queue = mutable.Queue.empty[Int]
+ queue += pid
+ while ( !queue.isEmpty ) {
+ val p = queue.dequeue()
+ val c = getChildPids(p)
+ if (!c.isEmpty) {
+ queue ++= c
+ ptree ++= c.toSet
+ }
+ }
+ ptree
+ }
+
+ private def getChildPids(pid: Int): ArrayBuffer[Int] = {
+ try {
+ val builder = new ProcessBuilder("pgrep", "-P", pid.toString)
+ val process = builder.start()
+ val childPidsInInt = mutable.ArrayBuffer.empty[Int]
+ def appendChildPid(s: String): Unit = {
+ if (s != "") {
+ logTrace("Found a child pid:" + s)
+ childPidsInInt += Integer.parseInt(s)
+ }
+ }
+ val stdoutThread = Utils.processStreamByLine("read stdout for pgrep",
+ process.getInputStream, appendChildPid)
+ val errorStringBuilder = new StringBuilder()
+ val stdErrThread = Utils.processStreamByLine(
+ "stderr for pgrep",
+ process.getErrorStream,
+ line => errorStringBuilder.append(line))
+ val exitCode = process.waitFor()
+ stdoutThread.join()
+ stdErrThread.join()
+ val errorString = errorStringBuilder.toString()
+ // pgrep will have exit code of 1 if there are more than one child process
+ // and it will have a exit code of 2 if there is no child process
+ if (exitCode != 0 && exitCode > 2) {
+ val cmd = builder.command().toArray.mkString(" ")
+ logWarning(s"Process $cmd exited with code $exitCode and stderr: $errorString")
+ throw new SparkException(s"Process $cmd exited with code $exitCode")
+ }
+ childPidsInInt
+ } catch {
+ case e: Exception =>
+ logWarning("Exception when trying to compute process tree." +
+ " As a result reporting of ProcessTree metrics is stopped.", e)
+ isAvailable = false
+ mutable.ArrayBuffer.empty[Int]
+ }
+ }
+
+ def addProcfsMetricsFromOneProcess(
+ allMetrics: ProcfsMetrics,
+ pid: Int): ProcfsMetrics = {
+
+ // The computation of RSS and Vmem are based on proc(5):
+ // http://man7.org/linux/man-pages/man5/proc.5.html
+ try {
+ val pidDir = new File(procfsDir, pid.toString)
+ Utils.tryWithResource(new InputStreamReader(
+ new FileInputStream(
+ new File(pidDir, procfsStatFile)), Charset.forName("UTF-8"))) { fReader =>
+ val in = new BufferedReader(fReader)
+ val procInfo = in.readLine
+ val procInfoSplit = procInfo.split(" ")
+ val vmem = procInfoSplit(22).toLong
+ val rssMem = procInfoSplit(23).toLong * pageSize
+ if (procInfoSplit(1).toLowerCase(Locale.US).contains("java")) {
+ allMetrics.copy(
+ jvmVmemTotal = allMetrics.jvmVmemTotal + vmem,
+ jvmRSSTotal = allMetrics.jvmRSSTotal + (rssMem)
+ )
+ }
+ else if (procInfoSplit(1).toLowerCase(Locale.US).contains("python")) {
+ allMetrics.copy(
+ pythonVmemTotal = allMetrics.pythonVmemTotal + vmem,
+ pythonRSSTotal = allMetrics.pythonRSSTotal + (rssMem)
+ )
+ }
+ else {
+ allMetrics.copy(
+ otherVmemTotal = allMetrics.otherVmemTotal + vmem,
+ otherRSSTotal = allMetrics.otherRSSTotal + (rssMem)
+ )
+ }
+ }
+ } catch {
+ case f: IOException =>
+ logWarning("There was a problem with reading" +
+ " the stat file of the process. ", f)
+ ProcfsMetrics(0, 0, 0, 0, 0, 0)
+ }
+ }
+
+ private[spark] def computeAllMetrics(): ProcfsMetrics = {
+ if (!isAvailable) {
+ return ProcfsMetrics(0, 0, 0, 0, 0, 0)
+ }
+ val pids = computeProcessTree
+ var allMetrics = ProcfsMetrics(0, 0, 0, 0, 0, 0)
+ for (p <- pids) {
+ allMetrics = addProcfsMetricsFromOneProcess(allMetrics, p)
+ }
+ allMetrics
--- End diff --
one weird thing about the error handling here -- if you get any error from `computeProcessTree` through all calls to `addProcsFsMetricsFromOneProcess`, then you'll set `isAvailable = false`, but you'll still return `allMetrics`. Depending on when the error occurs, you might have some partially accumulated metrics in there. I think if there is any error, you probably want to return empty metrics.
Also you'll leave `isAvailable=false` forever after that -- is that OK? I guess I don't really have strong feelings about this one, maybe its OK to give up forever for this executor.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org