You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/04 18:36:24 UTC

[spark] branch master updated: [SPARK-26792][CORE] Apply custom log URL to Spark UI

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d5bda2c  [SPARK-26792][CORE] Apply custom log URL to Spark UI
d5bda2c is described below

commit d5bda2c9e8dde6afc075cc7f65b15fa9aa82231c
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Mon Mar 4 10:36:04 2019 -0800

    [SPARK-26792][CORE] Apply custom log URL to Spark UI
    
    ## What changes were proposed in this pull request?
    
    [SPARK-23155](https://issues.apache.org/jira/browse/SPARK-23155) enables SHS to set up custom executor log URLs. This patch proposes to extend this feature to to Spark UI as well.
    
    Unlike the approach we did for SHS (replace executor log URLs when executor information is requested so it's like a change of view), here this patch replaces executor log URLs while registering executor, which also affects event log as well. In point of SHS's view, it will be treated as original log url when custom log url is applied to Spark UI.
    
    ## How was this patch tested?
    
    Added UT.
    
    Closes #23790 from HeartSaVioR/SPARK-26792.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../deploy/history/HistoryAppStatusStore.scala     | 61 +-------------
 .../spark/executor/ExecutorLogUrlHandler.scala     | 93 ++++++++++++++++++++++
 .../org/apache/spark/internal/config/UI.scala      | 11 +++
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  6 +-
 .../CoarseGrainedSchedulerBackendSuite.scala       | 56 ++++++++++++-
 docs/configuration.md                              | 15 ++++
 6 files changed, 182 insertions(+), 60 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
index 751382c..73b2dc2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala
@@ -17,11 +17,8 @@
 
 package org.apache.spark.deploy.history
 
-import java.util.concurrent.atomic.AtomicBoolean
-
-import scala.util.matching.Regex
-
 import org.apache.spark.SparkConf
+import org.apache.spark.executor.ExecutorLogUrlHandler
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.History._
 import org.apache.spark.status.AppStatusStore
@@ -33,8 +30,6 @@ private[spark] class HistoryAppStatusStore(
     store: KVStore)
   extends AppStatusStore(store, None) with Logging {
 
-  import HistoryAppStatusStore._
-
   private val logUrlPattern: Option[String] = {
     val appInfo = super.applicationInfo()
     val applicationCompleted = appInfo.attempts.nonEmpty && appInfo.attempts.head.completed
@@ -45,7 +40,7 @@ private[spark] class HistoryAppStatusStore(
     }
   }
 
-  private val informedForMissingAttributes = new AtomicBoolean(false)
+  private val logUrlHandler = new ExecutorLogUrlHandler(logUrlPattern)
 
   override def executorList(activeOnly: Boolean): Seq[v1.ExecutorSummary] = {
     val execList = super.executorList(activeOnly)
@@ -64,56 +59,10 @@ private[spark] class HistoryAppStatusStore(
   }
 
   private def replaceLogUrls(exec: v1.ExecutorSummary, urlPattern: String): v1.ExecutorSummary = {
-    val attributes = exec.attributes
-
-    // Relation between pattern {{FILE_NAME}} and attribute {{LOG_FILES}}
-    // Given that HistoryAppStatusStore don't know which types of log files can be provided
-    // from resource manager, we require resource manager to provide available types of log
-    // files, which are encouraged to be same as types of log files provided in original log URLs.
-    // Once we get the list of log files, we need to expose them to end users as a pattern
-    // so that end users can compose custom log URL(s) including log file name(s).
-    val allPatterns = CUSTOM_URL_PATTERN_REGEX.findAllMatchIn(urlPattern).map(_.group(1)).toSet
-    val allPatternsExceptFileName = allPatterns.filter(_ != "FILE_NAME")
-    val allAttributeKeys = attributes.keySet
-    val allAttributeKeysExceptLogFiles = allAttributeKeys.filter(_ != "LOG_FILES")
-
-    if (allPatternsExceptFileName.diff(allAttributeKeysExceptLogFiles).nonEmpty) {
-      logFailToRenewLogUrls("some of required attributes are missing in app's event log.",
-        allPatternsExceptFileName, allAttributeKeys)
-      return exec
-    } else if (allPatterns.contains("FILE_NAME") && !allAttributeKeys.contains("LOG_FILES")) {
-      logFailToRenewLogUrls("'FILE_NAME' parameter is provided, but file information is " +
-        "missing in app's event log.", allPatternsExceptFileName, allAttributeKeys)
-      return exec
-    }
-
-    val updatedUrl = allPatternsExceptFileName.foldLeft(urlPattern) { case (orig, patt) =>
-      // we already checked the existence of attribute when comparing keys
-      orig.replace(s"{{$patt}}", attributes(patt))
-    }
-
-    val newLogUrlMap = if (allPatterns.contains("FILE_NAME")) {
-      // allAttributeKeys should contain "LOG_FILES"
-      attributes("LOG_FILES").split(",").map { file =>
-        file -> updatedUrl.replace("{{FILE_NAME}}", file)
-      }.toMap
-    } else {
-      Map("log" -> updatedUrl)
-    }
-
+    val newLogUrlMap = logUrlHandler.applyPattern(exec.executorLogs, exec.attributes)
     replaceExecutorLogs(exec, newLogUrlMap)
   }
 
-  private def logFailToRenewLogUrls(
-      reason: String,
-      allPatterns: Set[String],
-      allAttributes: Set[String]): Unit = {
-    if (informedForMissingAttributes.compareAndSet(false, true)) {
-      logInfo(s"Fail to renew executor log urls: $reason. Required: $allPatterns / " +
-        s"available: $allAttributes. Falling back to show app's original log urls.")
-    }
-  }
-
   private def replaceExecutorLogs(
       source: v1.ExecutorSummary,
       newExecutorLogs: Map[String, String]): v1.ExecutorSummary = {
@@ -127,7 +76,3 @@ private[spark] class HistoryAppStatusStore(
   }
 
 }
-
-private[spark] object HistoryAppStatusStore {
-  val CUSTOM_URL_PATTERN_REGEX: Regex = "\\{\\{([A-Za-z0-9_\\-]+)\\}\\}".r
-}
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala
new file mode 100644
index 0000000..0ddeef8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorLogUrlHandler.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.util.matching.Regex
+
+import org.apache.spark.internal.Logging
+
+private[spark] class ExecutorLogUrlHandler(logUrlPattern: Option[String]) extends Logging {
+  import ExecutorLogUrlHandler._
+
+  private val informedForMissingAttributes = new AtomicBoolean(false)
+
+  def applyPattern(
+      logUrls: Map[String, String],
+      attributes: Map[String, String]): Map[String, String] = {
+    logUrlPattern match {
+      case Some(pattern) => doApplyPattern(logUrls, attributes, pattern)
+      case None => logUrls
+    }
+  }
+
+  private def doApplyPattern(
+      logUrls: Map[String, String],
+      attributes: Map[String, String],
+      urlPattern: String): Map[String, String] = {
+    // Relation between pattern {{FILE_NAME}} and attribute {{LOG_FILES}}
+    // Given that this class don't know which types of log files can be provided
+    // from resource manager, we require resource manager to provide available types of log
+    // files, which are encouraged to be same as types of log files provided in original log URLs.
+    // Once we get the list of log files, we need to expose them to end users as a pattern
+    // so that end users can compose custom log URL(s) including log file name(s).
+    val allPatterns = CUSTOM_URL_PATTERN_REGEX.findAllMatchIn(urlPattern).map(_.group(1)).toSet
+    val allPatternsExceptFileName = allPatterns.filter(_ != "FILE_NAME")
+    val allAttributeKeys = attributes.keySet
+    val allAttributeKeysExceptLogFiles = allAttributeKeys.filter(_ != "LOG_FILES")
+
+    if (allPatternsExceptFileName.diff(allAttributeKeysExceptLogFiles).nonEmpty) {
+      logFailToRenewLogUrls("some of required attributes are missing in app's event log.",
+        allPatternsExceptFileName, allAttributeKeys)
+      logUrls
+    } else if (allPatterns.contains("FILE_NAME") && !allAttributeKeys.contains("LOG_FILES")) {
+      logFailToRenewLogUrls("'FILE_NAME' parameter is provided, but file information is " +
+        "missing in app's event log.", allPatternsExceptFileName, allAttributeKeys)
+      logUrls
+    } else {
+      val updatedUrl = allPatternsExceptFileName.foldLeft(urlPattern) { case (orig, patt) =>
+        // we already checked the existence of attribute when comparing keys
+        orig.replace(s"{{$patt}}", attributes(patt))
+      }
+
+      if (allPatterns.contains("FILE_NAME")) {
+        // allAttributeKeys should contain "LOG_FILES"
+        attributes("LOG_FILES").split(",").map { file =>
+          file -> updatedUrl.replace("{{FILE_NAME}}", file)
+        }.toMap
+      } else {
+        Map("log" -> updatedUrl)
+      }
+    }
+  }
+
+  private def logFailToRenewLogUrls(
+      reason: String,
+      allPatterns: Set[String],
+      allAttributes: Set[String]): Unit = {
+    if (informedForMissingAttributes.compareAndSet(false, true)) {
+      logInfo(s"Fail to renew executor log urls: $reason. Required: $allPatterns / " +
+        s"available: $allAttributes. Falling back to show app's original log urls.")
+    }
+  }
+}
+
+private[spark] object ExecutorLogUrlHandler {
+  val CUSTOM_URL_PATTERN_REGEX: Regex = "\\{\\{([A-Za-z0-9_\\-]+)\\}\\}".r
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/config/UI.scala b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
index 6c04f0d..a11970e 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/UI.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/UI.scala
@@ -142,4 +142,15 @@ private[spark] object UI {
   val USER_GROUPS_MAPPING = ConfigBuilder("spark.user.groups.mapping")
     .stringConf
     .createWithDefault("org.apache.spark.security.ShellBasedGroupsMappingProvider")
+
+  val CUSTOM_EXECUTOR_LOG_URL = ConfigBuilder("spark.ui.custom.executor.log.url")
+    .doc("Specifies custom spark executor log url for supporting external log service instead of " +
+      "using cluster managers' application log urls in the Spark UI. Spark will support " +
+      "some path variables via patterns which can vary on cluster manager. Please check the " +
+      "documentation for your cluster manager to see which patterns are supported, if any. " +
+      "This configuration replaces original log urls in event log, which will be also effective " +
+      "when accessing the application on history server. The new log urls must be permanent, " +
+      "otherwise you might have dead link for executor log urls.")
+    .stringConf
+    .createOptional
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 6bba157..6e8aa47 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation
 import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.security.HadoopDelegationTokenManager
+import org.apache.spark.executor.ExecutorLogUrlHandler
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Network._
@@ -125,6 +126,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       .filter { case (k, _) => k.startsWith("spark.") }
       .toSeq
 
+    private val logUrlHandler: ExecutorLogUrlHandler = new ExecutorLogUrlHandler(
+      conf.get(UI.CUSTOM_EXECUTOR_LOG_URL))
+
     override def onStart() {
       // Periodically revive offers to allow delay scheduling to work
       val reviveIntervalMs = conf.get(SCHEDULER_REVIVE_INTERVAL).getOrElse(1000L)
@@ -207,7 +211,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
           totalCoreCount.addAndGet(cores)
           totalRegisteredExecutors.addAndGet(1)
           val data = new ExecutorData(executorRef, executorAddress, hostname,
-            cores, cores, logUrls, attributes)
+            cores, cores, logUrlHandler.applyPattern(logUrls, attributes), attributes)
           // This must be synchronized because variables mutated
           // in this block are read when requesting executors
           CoarseGrainedSchedulerBackend.this.synchronized {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 480e861..ae306d3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -22,11 +22,15 @@ import java.util.concurrent.atomic.AtomicBoolean
 import scala.concurrent.duration._
 
 import org.scalatest.concurrent.Eventually
+import org.scalatest.mockito.MockitoSugar._
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkException, SparkFunSuite}
-import org.apache.spark.internal.config.CPUS_PER_TASK
+import org.apache.spark.internal.config.{CPUS_PER_TASK, UI}
 import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
 import org.apache.spark.rdd.RDD
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.{RpcUtils, SerializableBuffer}
 
 class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext
@@ -120,6 +124,56 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
     }
   }
 
+  // Here we just have test for one happy case instead of all cases: other cases are covered in
+  // FsHistoryProviderSuite.
+  test("custom log url for Spark UI is applied") {
+    val customExecutorLogUrl = "http://newhost:9999/logs/clusters/{{CLUSTER_ID}}/users/{{USER}}" +
+      "/containers/{{CONTAINER_ID}}/{{FILE_NAME}}"
+
+    val conf = new SparkConf()
+      .set(UI.CUSTOM_EXECUTOR_LOG_URL, customExecutorLogUrl)
+      .setMaster("local-cluster[0, 3, 1024]")
+      .setAppName("test")
+
+    sc = new SparkContext(conf)
+    val backend = sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
+    val mockEndpointRef = mock[RpcEndpointRef]
+    val mockAddress = mock[RpcAddress]
+
+    val logUrls = Map(
+      "stdout" -> "http://oldhost:8888/logs/dummy/stdout",
+      "stderr" -> "http://oldhost:8888/logs/dummy/stderr")
+    val attributes = Map(
+      "CLUSTER_ID" -> "cl1",
+      "USER" -> "dummy",
+      "CONTAINER_ID" -> "container1",
+      "LOG_FILES" -> "stdout,stderr")
+    val baseUrl = s"http://newhost:9999/logs/clusters/${attributes("CLUSTER_ID")}" +
+      s"/users/${attributes("USER")}/containers/${attributes("CONTAINER_ID")}"
+
+    var executorAddedCount: Int = 0
+    val listener = new SparkListener() {
+      override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
+        executorAddedCount += 1
+        assert(executorAdded.executorInfo.logUrlMap === Seq("stdout", "stderr").map { file =>
+          file -> (baseUrl + s"/$file")
+        }.toMap)
+      }
+    }
+
+    sc.addSparkListener(listener)
+
+    backend.driverEndpoint.askSync[Boolean](
+      RegisterExecutor("1", mockEndpointRef, mockAddress.host, 1, logUrls, attributes))
+    backend.driverEndpoint.askSync[Boolean](
+      RegisterExecutor("2", mockEndpointRef, mockAddress.host, 1, logUrls, attributes))
+    backend.driverEndpoint.askSync[Boolean](
+      RegisterExecutor("3", mockEndpointRef, mockAddress.host, 1, logUrls, attributes))
+
+    sc.listenerBus.waitUntilEmpty(executorUpTimeout.toMillis)
+    assert(executorAddedCount === 3)
+  }
+
   private def testSubmitJob(sc: SparkContext, rdd: RDD[Int]): Unit = {
     sc.submitJob(
       rdd,
diff --git a/docs/configuration.md b/docs/configuration.md
index 5b5891e..f23dc7c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -914,6 +914,21 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.ui.custom.executor.log.url</code></td>
+  <td>(none)</td>
+  <td>
+    Specifies custom spark executor log URL for supporting external log service instead of using cluster
+    managers' application log URLs in Spark UI. Spark will support some path variables via patterns
+    which can vary on cluster manager. Please check the documentation for your cluster manager to
+    see which patterns are supported, if any. <p/>
+    Please note that this configuration also replaces original log urls in event log,
+    which will be also effective when accessing the application on history server. The new log urls must be
+    permanent, otherwise you might have dead link for executor log urls.
+    <p/>
+    For now, only YARN mode supports this configuration
+  </td>
+</tr>
+<tr>
   <td><code>spark.worker.ui.retainedExecutors</code></td>
   <td>1000</td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org