You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/01/17 11:52:20 UTC
[spark] branch master updated: [SPARK-26457] Show hadoop
configurations in HistoryServer environment tab
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 650b879 [SPARK-26457] Show hadoop configurations in HistoryServer environment tab
650b879 is described below
commit 650b879de9e3b426fd38cdf1a8abf701a0c4a086
Author: xiaodeshan <xi...@xiaomi.com>
AuthorDate: Thu Jan 17 05:51:43 2019 -0600
[SPARK-26457] Show hadoop configurations in HistoryServer environment tab
## What changes were proposed in this pull request?
I know that yarn provided all hadoop configurations. But I guess it may be fine that the historyserver unify all configuration in it. It will be convenient for us to debug some problems.
## How was this patch tested?
![image](https://user-images.githubusercontent.com/42019462/50808610-4d742900-133a-11e9-868c-2976e856ed9a.png)
Closes #23486 from deshanxiao/spark-26457.
Lead-authored-by: xiaodeshan <xi...@xiaomi.com>
Co-authored-by: deshanxiao <42...@users.noreply.github.com>
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../resources/org/apache/spark/ui/static/webui.js | 1 +
.../main/scala/org/apache/spark/SparkContext.scala | 4 ++--
core/src/main/scala/org/apache/spark/SparkEnv.scala | 7 +++++++
.../org/apache/spark/status/AppStatusListener.scala | 1 +
.../scala/org/apache/spark/status/api/v1/api.scala | 1 +
.../org/apache/spark/ui/env/EnvironmentPage.scala | 21 +++++++++++++++++----
.../scala/org/apache/spark/util/JsonProtocol.scala | 6 ++++++
.../app_environment_expectation.json | 5 +++++
.../resources/spark-events/app-20161116163331-0000 | 2 +-
.../deploy/history/FsHistoryProviderSuite.scala | 2 ++
.../spark/scheduler/EventLoggingListenerSuite.scala | 3 ++-
.../org/apache/spark/util/JsonProtocolSuite.scala | 4 ++++
project/MimaExcludes.scala | 3 +++
13 files changed, 52 insertions(+), 8 deletions(-)
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js
index b1254e0..9f5744a 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js
@@ -63,6 +63,7 @@ $(function() {
collapseTablePageLoad('collapse-aggregated-finishedDrivers','aggregated-finishedDrivers');
collapseTablePageLoad('collapse-aggregated-runtimeInformation','aggregated-runtimeInformation');
collapseTablePageLoad('collapse-aggregated-sparkProperties','aggregated-sparkProperties');
+ collapseTablePageLoad('collapse-aggregated-hadoopProperties','aggregated-hadoopProperties');
collapseTablePageLoad('collapse-aggregated-systemProperties','aggregated-systemProperties');
collapseTablePageLoad('collapse-aggregated-classpathEntries','aggregated-classpathEntries');
collapseTablePageLoad('collapse-aggregated-activeJobs','aggregated-activeJobs');
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c9afc79..e0c0635 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2370,8 +2370,8 @@ class SparkContext(config: SparkConf) extends Logging {
val schedulingMode = getSchedulingMode.toString
val addedJarPaths = addedJars.keys.toSeq
val addedFilePaths = addedFiles.keys.toSeq
- val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,
- addedFilePaths)
+ val environmentDetails = SparkEnv.environmentDetails(conf, hadoopConfiguration,
+ schedulingMode, addedJarPaths, addedFilePaths)
val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails)
listenerBus.post(environmentUpdate)
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 4d7542c..ff4a043 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -21,10 +21,12 @@ import java.io.File
import java.net.Socket
import java.util.Locale
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Properties
import com.google.common.collect.MapMaker
+import org.apache.hadoop.conf.Configuration
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
@@ -400,6 +402,7 @@ object SparkEnv extends Logging {
private[spark]
def environmentDetails(
conf: SparkConf,
+ hadoopConf: Configuration,
schedulingMode: String,
addedJars: Seq[String],
addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = {
@@ -435,9 +438,13 @@ object SparkEnv extends Logging {
val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User"))
val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted
+ // Add Hadoop properties, it will not ignore configs including in Spark. Some spark
+ // conf starting with "spark.hadoop" may overwrite it.
+ val hadoopProperties = hadoopConf.asScala.map(entry => (entry.getKey, entry.getValue)).toSeq
Map[String, Seq[(String, String)]](
"JVM Information" -> jvmInformation,
"Spark Properties" -> sparkProperties,
+ "Hadoop Properties" -> hadoopProperties,
"System Properties" -> otherProperties,
"Classpath Entries" -> classPaths)
}
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 262ff65..12c5d4d 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -147,6 +147,7 @@ private[spark] class AppStatusListener(
val envInfo = new v1.ApplicationEnvironmentInfo(
runtime,
details.getOrElse("Spark Properties", Nil),
+ details.getOrElse("Hadoop Properties", Nil),
details.getOrElse("System Properties", Nil),
details.getOrElse("Classpath Entries", Nil))
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index c7d3cd3..825fc54 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -352,6 +352,7 @@ class VersionInfo private[spark](
class ApplicationEnvironmentInfo private[spark] (
val runtime: RuntimeInfo,
val sparkProperties: Seq[(String, String)],
+ val hadoopProperties: Seq[(String, String)],
val systemProperties: Seq[(String, String)],
val classpathEntries: Seq[(String, String)])
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
index 3d465a3..cbb8b3c 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala
@@ -42,6 +42,8 @@ private[ui] class EnvironmentPage(
propertyHeader, jvmRow, jvmInformation, fixedWidth = true)
val sparkPropertiesTable = UIUtils.listingTable(propertyHeader, propertyRow,
Utils.redact(conf, appEnv.sparkProperties.toSeq), fixedWidth = true)
+ val hadoopPropertiesTable = UIUtils.listingTable(propertyHeader, propertyRow,
+ Utils.redact(conf, appEnv.hadoopProperties.toSeq), fixedWidth = true)
val systemPropertiesTable = UIUtils.listingTable(
propertyHeader, propertyRow, appEnv.systemProperties, fixedWidth = true)
val classpathEntriesTable = UIUtils.listingTable(
@@ -70,26 +72,37 @@ private[ui] class EnvironmentPage(
<div class="aggregated-sparkProperties collapsible-table">
{sparkPropertiesTable}
</div>
+ <span class="collapse-aggregated-hadoopProperties collapse-table"
+ onClick="collapseTable('collapse-aggregated-hadoopProperties',
+ 'aggregated-hadoopProperties')">
+ <h4>
+ <span class="collapse-table-arrow arrow-closed"></span>
+ <a>Hadoop Properties</a>
+ </h4>
+ </span>
+ <div class="aggregated-hadoopProperties collapsible-table collapsed">
+ {hadoopPropertiesTable}
+ </div>
<span class="collapse-aggregated-systemProperties collapse-table"
onClick="collapseTable('collapse-aggregated-systemProperties',
'aggregated-systemProperties')">
<h4>
- <span class="collapse-table-arrow arrow-open"></span>
+ <span class="collapse-table-arrow arrow-closed"></span>
<a>System Properties</a>
</h4>
</span>
- <div class="aggregated-systemProperties collapsible-table">
+ <div class="aggregated-systemProperties collapsible-table collapsed">
{systemPropertiesTable}
</div>
<span class="collapse-aggregated-classpathEntries collapse-table"
onClick="collapseTable('collapse-aggregated-classpathEntries',
'aggregated-classpathEntries')">
<h4>
- <span class="collapse-table-arrow arrow-open"></span>
+ <span class="collapse-table-arrow arrow-closed"></span>
<a>Classpath Entries</a>
</h4>
</span>
- <div class="aggregated-classpathEntries collapsible-table">
+ <div class="aggregated-classpathEntries collapsible-table collapsed">
{classpathEntriesTable}
</div>
</span>
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 348291f..3370152 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -171,11 +171,13 @@ private[spark] object JsonProtocol {
val environmentDetails = environmentUpdate.environmentDetails
val jvmInformation = mapToJson(environmentDetails("JVM Information").toMap)
val sparkProperties = mapToJson(environmentDetails("Spark Properties").toMap)
+ val hadoopProperties = mapToJson(environmentDetails("Hadoop Properties").toMap)
val systemProperties = mapToJson(environmentDetails("System Properties").toMap)
val classpathEntries = mapToJson(environmentDetails("Classpath Entries").toMap)
("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.environmentUpdate) ~
("JVM Information" -> jvmInformation) ~
("Spark Properties" -> sparkProperties) ~
+ ("Hadoop Properties" -> hadoopProperties) ~
("System Properties" -> systemProperties) ~
("Classpath Entries" -> classpathEntries)
}
@@ -653,9 +655,13 @@ private[spark] object JsonProtocol {
}
def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {
+ // For compatible with previous event logs
+ val hadoopProperties = jsonOption(json \ "Hadoop Properties").map(mapFromJson(_).toSeq)
+ .getOrElse(Seq.empty)
val environmentDetails = Map[String, Seq[(String, String)]](
"JVM Information" -> mapFromJson(json \ "JVM Information").toSeq,
"Spark Properties" -> mapFromJson(json \ "Spark Properties").toSeq,
+ "Hadoop Properties" -> hadoopProperties,
"System Properties" -> mapFromJson(json \ "System Properties").toSeq,
"Classpath Entries" -> mapFromJson(json \ "Classpath Entries").toSeq)
SparkListenerEnvironmentUpdate(environmentDetails)
diff --git a/core/src/test/resources/HistoryServerExpectations/app_environment_expectation.json b/core/src/test/resources/HistoryServerExpectations/app_environment_expectation.json
index 4ed0538..a646172 100644
--- a/core/src/test/resources/HistoryServerExpectations/app_environment_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/app_environment_expectation.json
@@ -32,6 +32,11 @@
[ "spark.app.id", "app-20161116163331-0000" ],
[ "spark.task.maxFailures", "4" ]
],
+ "hadoopProperties" : [
+ [ "mapreduce.jobtracker.address", "local" ],
+ [ "yarn.resourcemanager.scheduler.monitor.policies", "org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy" ],
+ [ "mapreduce.jobhistory.client.thread-count", "10" ]
+ ],
"systemProperties" : [
[ "java.io.tmpdir", "/var/folders/l4/d46wlzj16593f3d812vk49tw0000gp/T/" ],
[ "line.separator", "\n" ],
diff --git a/core/src/test/resources/spark-events/app-20161116163331-0000 b/core/src/test/resources/spark-events/app-20161116163331-0000
index 57cfc5b..8f77fdd 100755
--- a/core/src/test/resources/spark-events/app-20161116163331-0000
+++ b/core/src/test/resources/spark-events/app-20161116163331-0000
@@ -1,6 +1,6 @@
{"Event":"SparkListenerLogStart","Spark Version":"2.1.0-SNAPSHOT"}
{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"172.22.0.167","Port":51475},"Maximum Memory":908381388,"Timestamp":1479335611477,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":524288000}
-{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre","Java Version":"1.8.0_92 (Oracle Corporation)","Scala Version":"version 2.11.8"},"Spark Properties":{"spark.blacklist.task.maxTaskAttemptsPerExecutor":"3","spark.blacklist.enabled":"TRUE","spark.driver.host":"172.22.0.167","spark.blacklist.task.maxTaskAttemptsPerNode":"3","spark.eventLog.enabled":"TRUE","spark.driver.port":"51459","spark.repl.clas [...]
+{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/Library/Java/JavaVirtualMachines/jdk1.8.0_92.jdk/Contents/Home/jre","Java Version":"1.8.0_92 (Oracle Corporation)","Scala Version":"version 2.11.8"},"Spark Properties":{"spark.blacklist.task.maxTaskAttemptsPerExecutor":"3","spark.blacklist.enabled":"TRUE","spark.driver.host":"172.22.0.167","spark.blacklist.task.maxTaskAttemptsPerNode":"3","spark.eventLog.enabled":"TRUE","spark.driver.port":"51459","spark.repl.clas [...]
{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"app-20161116163331-0000","Timestamp":1479335609916,"User":"jose"}
{"Event":"SparkListenerExecutorAdded","Timestamp":1479335615320,"Executor ID":"3","Executor Info":{"Host":"172.22.0.167","Total Cores":4,"Log Urls":{"stdout":"http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stdout","stderr":"http://172.22.0.167:51466/logPage/?appId=app-20161116163331-0000&executorId=3&logType=stderr"}}}
{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"3","Host":"172.22.0.167","Port":51485},"Maximum Memory":908381388,"Timestamp":1479335615387,"Maximum Onheap Memory":384093388,"Maximum Offheap Memory":524288000}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index cc32a0a..74574e2 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -624,6 +624,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
"test", Some("attempt1")),
SparkListenerEnvironmentUpdate(Map(
"Spark Properties" -> properties.toSeq,
+ "Hadoop Properties" -> Seq.empty,
"JVM Information" -> Seq.empty,
"System Properties" -> Seq.empty,
"Classpath Entries" -> Seq.empty
@@ -882,6 +883,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
SparkListenerApplicationStart("end-event-test", Some("end-event-test"), 1L, "test", None),
SparkListenerEnvironmentUpdate(Map(
"Spark Properties" -> Seq.empty,
+ "Hadoop Properties" -> Seq.empty,
"JVM Information" -> Seq.empty,
"System Properties" -> Seq.empty,
"Classpath Entries" -> Seq.empty
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index c0cb158..811b9fe 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -108,8 +108,9 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
val secretPassword = "secret_password"
val conf = getLoggingConf(testDirPath, None)
.set(key, secretPassword)
+ val hadoopconf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
- val envDetails = SparkEnv.environmentDetails(conf, "FIFO", Seq.empty, Seq.empty)
+ val envDetails = SparkEnv.environmentDetails(conf, hadoopconf, "FIFO", Seq.empty, Seq.empty)
val event = SparkListenerEnvironmentUpdate(envDetails)
val redactedProps = eventLogger.redactEvent(event).environmentDetails("Spark Properties").toMap
assert(redactedProps(key) == "*********(redacted)")
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index b88f257..c63f04d 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -66,6 +66,7 @@ class JsonProtocolSuite extends SparkFunSuite {
val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
"JVM Information" -> Seq(("GC speed", "9999 objects/s"), ("Java home", "Land of coffee")),
"Spark Properties" -> Seq(("Job throughput", "80000 jobs/s, regardless of job type")),
+ "Hadoop Properties" -> Seq(("hadoop.tmp.dir", "/usr/local/hadoop/tmp")),
"System Properties" -> Seq(("Username", "guest"), ("Password", "guest")),
"Classpath Entries" -> Seq(("Super library", "/tmp/super_library"))
))
@@ -1761,6 +1762,9 @@ private[spark] object JsonProtocolSuite extends Assertions {
| "Spark Properties": {
| "Job throughput": "80000 jobs/s, regardless of job type"
| },
+ | "Hadoop Properties": {
+ | "hadoop.tmp.dir": "/usr/local/hadoop/tmp"
+ | },
| "System Properties": {
| "Username": "guest",
| "Password": "guest"
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index cf8d9f3..3e232ba 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -227,6 +227,9 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.setActiveContext"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkContext.markPartiallyConstructed"),
+ // [SPARK-26457] Show hadoop configurations in HistoryServer environment tab
+ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ApplicationEnvironmentInfo.this"),
+
// Data Source V2 API changes
(problem: Problem) => problem match {
case MissingClassProblem(cls) =>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org