You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/26 08:11:57 UTC
[spark] branch branch-3.1 updated: [SPARK-34232][CORE] Redact
SparkListenerEnvironmentUpdate event in log
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 6712e28 [SPARK-34232][CORE] Redact SparkListenerEnvironmentUpdate event in log
6712e28 is described below
commit 6712e288683b40fe755d4002c58121e0fd59a817
Author: Warren Zhu <wa...@gmail.com>
AuthorDate: Tue Jan 26 17:10:53 2021 +0900
[SPARK-34232][CORE] Redact SparkListenerEnvironmentUpdate event in log
### What changes were proposed in this pull request?
Redact event SparkListenerEnvironmentUpdate in log when its processing time exceeded logSlowEventThreshold
### Why are the changes needed?
Credentials could be exposed when its processing time exceeded logSlowEventThreshold
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manually tested
Closes #31335 from warrenzhu25/34232.
Authored-by: Warren Zhu <wa...@gmail.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit 68b765e6b800ea7753cbf4ba5a2a5d2749eb2a57)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../apache/spark/scheduler/EventLoggingListener.scala | 16 ++++++++--------
.../main/scala/org/apache/spark/util/ListenerBus.scala | 12 +++++++++++-
.../spark/scheduler/EventLoggingListenerSuite.scala | 4 ++--
3 files changed, 21 insertions(+), 11 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index d4e22d7..c57894b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -130,7 +130,7 @@ private[spark] class EventLoggingListener(
}
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
- logEvent(redactEvent(event))
+ logEvent(redactEvent(sparkConf, event))
}
// Events that trigger a flush
@@ -295,8 +295,15 @@ private[spark] class EventLoggingListener(
}
redactedProperties
}
+}
+
+private[spark] object EventLoggingListener extends Logging {
+ val DEFAULT_LOG_DIR = "/tmp/spark-events"
+ // Dummy stage key used by driver in executor metrics updates
+ val DRIVER_STAGE_KEY = (-1, -1)
private[spark] def redactEvent(
+ sparkConf: SparkConf,
event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = {
// environmentDetails maps a string descriptor to a set of properties
// Similar to:
@@ -312,11 +319,4 @@ private[spark] class EventLoggingListener(
}
SparkListenerEnvironmentUpdate(redactedProps)
}
-
-}
-
-private[spark] object EventLoggingListener extends Logging {
- val DEFAULT_LOG_DIR = "/tmp/spark-events"
- // Dummy stage key used by driver in executor metrics updates
- val DRIVER_STAGE_KEY = (-1, -1)
}
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index 51cd7d1..3520fa8 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -27,6 +27,8 @@ import com.codahale.metrics.Timer
import org.apache.spark.SparkEnv
import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.scheduler.EventLoggingListener
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
/**
* An event bus which posts events to its listeners.
@@ -128,7 +130,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
if (maybeTimerContext != null) {
val elapsed = maybeTimerContext.stop()
if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {
- logInfo(s"Process of event ${event} by listener ${listenerName} took " +
+ logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " +
s"${elapsed / 1000000000d}s.")
}
}
@@ -150,4 +152,12 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
}
+ private def redactEvent(e: E): E = {
+ e match {
+ case event: SparkListenerEnvironmentUpdate =>
+ EventLoggingListener.redactEvent(env.conf, event).asInstanceOf[E]
+ case _ => e
+ }
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 7acb845..240774d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -90,11 +90,11 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
val conf = getLoggingConf(testDirPath, None)
.set(key, secretPassword)
val hadoopconf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
- val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
val envDetails = SparkEnv.environmentDetails(
conf, hadoopconf, "FIFO", Seq.empty, Seq.empty, Seq.empty)
val event = SparkListenerEnvironmentUpdate(envDetails)
- val redactedProps = eventLogger.redactEvent(event).environmentDetails("Spark Properties").toMap
+ val redactedProps = EventLoggingListener
+ .redactEvent(conf, event).environmentDetails("Spark Properties").toMap
assert(redactedProps(key) == "*********(redacted)")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org