You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/26 08:11:57 UTC

[spark] branch branch-3.1 updated: [SPARK-34232][CORE] Redact SparkListenerEnvironmentUpdate event in log

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 6712e28  [SPARK-34232][CORE] Redact SparkListenerEnvironmentUpdate event in log
6712e28 is described below

commit 6712e288683b40fe755d4002c58121e0fd59a817
Author: Warren Zhu <wa...@gmail.com>
AuthorDate: Tue Jan 26 17:10:53 2021 +0900

    [SPARK-34232][CORE] Redact SparkListenerEnvironmentUpdate event in log
    
    ### What changes were proposed in this pull request?
    Redact event SparkListenerEnvironmentUpdate in log when its processing time exceeded logSlowEventThreshold
    
    ### Why are the changes needed?
    Credentials could be exposed when its processing time exceeded logSlowEventThreshold
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manually tested
    
    Closes #31335 from warrenzhu25/34232.
    
    Authored-by: Warren Zhu <wa...@gmail.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit 68b765e6b800ea7753cbf4ba5a2a5d2749eb2a57)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../apache/spark/scheduler/EventLoggingListener.scala    | 16 ++++++++--------
 .../main/scala/org/apache/spark/util/ListenerBus.scala   | 12 +++++++++++-
 .../spark/scheduler/EventLoggingListenerSuite.scala      |  4 ++--
 3 files changed, 21 insertions(+), 11 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index d4e22d7..c57894b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -130,7 +130,7 @@ private[spark] class EventLoggingListener(
   }
 
   override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
-    logEvent(redactEvent(event))
+    logEvent(redactEvent(sparkConf, event))
   }
 
   // Events that trigger a flush
@@ -295,8 +295,15 @@ private[spark] class EventLoggingListener(
     }
     redactedProperties
   }
+}
+
+private[spark] object EventLoggingListener extends Logging {
+  val DEFAULT_LOG_DIR = "/tmp/spark-events"
+  // Dummy stage key used by driver in executor metrics updates
+  val DRIVER_STAGE_KEY = (-1, -1)
 
   private[spark] def redactEvent(
+      sparkConf: SparkConf,
       event: SparkListenerEnvironmentUpdate): SparkListenerEnvironmentUpdate = {
     // environmentDetails maps a string descriptor to a set of properties
     // Similar to:
@@ -312,11 +319,4 @@ private[spark] class EventLoggingListener(
     }
     SparkListenerEnvironmentUpdate(redactedProps)
   }
-
-}
-
-private[spark] object EventLoggingListener extends Logging {
-  val DEFAULT_LOG_DIR = "/tmp/spark-events"
-  // Dummy stage key used by driver in executor metrics updates
-  val DRIVER_STAGE_KEY = (-1, -1)
 }
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index 51cd7d1..3520fa8 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -27,6 +27,8 @@ import com.codahale.metrics.Timer
 
 import org.apache.spark.SparkEnv
 import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.scheduler.EventLoggingListener
+import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate
 
 /**
  * An event bus which posts events to its listeners.
@@ -128,7 +130,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
         if (maybeTimerContext != null) {
           val elapsed = maybeTimerContext.stop()
           if (logSlowEventEnabled && elapsed > logSlowEventThreshold) {
-            logInfo(s"Process of event ${event} by listener ${listenerName} took " +
+            logInfo(s"Process of event ${redactEvent(event)} by listener ${listenerName} took " +
               s"${elapsed / 1000000000d}s.")
           }
         }
@@ -150,4 +152,12 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
     listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
   }
 
+  private def redactEvent(e: E): E = {
+    e match {
+      case event: SparkListenerEnvironmentUpdate =>
+        EventLoggingListener.redactEvent(env.conf, event).asInstanceOf[E]
+      case _ => e
+    }
+  }
+
 }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 7acb845..240774d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -90,11 +90,11 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
     val conf = getLoggingConf(testDirPath, None)
       .set(key, secretPassword)
     val hadoopconf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
-    val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
     val envDetails = SparkEnv.environmentDetails(
       conf, hadoopconf, "FIFO", Seq.empty, Seq.empty, Seq.empty)
     val event = SparkListenerEnvironmentUpdate(envDetails)
-    val redactedProps = eventLogger.redactEvent(event).environmentDetails("Spark Properties").toMap
+    val redactedProps = EventLoggingListener
+      .redactEvent(conf, event).environmentDetails("Spark Properties").toMap
     assert(redactedProps(key) == "*********(redacted)")
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org