You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/03/06 23:49:40 UTC

[spark] branch branch-3.0 updated: [SPARK-38411][CORE] Use `UTF-8` when `doMergeApplicationListingInternal` reads event logs

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e036de3  [SPARK-38411][CORE] Use `UTF-8` when `doMergeApplicationListingInternal` reads event logs
e036de3 is described below

commit e036de326bdc6bc828eee910861851d52c81f6d5
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Sun Mar 6 15:41:20 2022 -0800

    [SPARK-38411][CORE] Use `UTF-8` when `doMergeApplicationListingInternal` reads event logs
    
    ### What changes were proposed in this pull request?
    
    Use UTF-8 instead of system default encoding to read event log
    
    ### Why are the changes needed?
    
    After SPARK-29160, we should always use UTF-8 to read event log, otherwise, if Spark History Server run with different default charset than "UTF-8", will encounter such error.
    
    ```
    2022-03-04 12:16:00,143 [3752440] - INFO  [log-replay-executor-19:Logging57] - Parsing hdfs://hz-cluster11/spark2-history/application_1640597251469_2453817_1.lz4 for listing data...
    2022-03-04 12:16:00,145 [3752442] - ERROR [log-replay-executor-18:Logging94] - Exception while merging application listings
    java.nio.charset.MalformedInputException: Input length = 1
    	at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
    	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339)
    	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    	at java.io.InputStreamReader.read(InputStreamReader.java:184)
    	at java.io.BufferedReader.fill(BufferedReader.java:161)
    	at java.io.BufferedReader.readLine(BufferedReader.java:324)
    	at java.io.BufferedReader.readLine(BufferedReader.java:389)
    	at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74)
    	at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:884)
    	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
    	at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:82)
    	at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListing$4(FsHistoryProvider.scala:819)
    	at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListing$4$adapted(FsHistoryProvider.scala:801)
    	at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2626)
    	at org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListing(FsHistoryProvider.scala:801)
    	at org.apache.spark.deploy.history.FsHistoryProvider.mergeApplicationListing(FsHistoryProvider.scala:715)
    	at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$15(FsHistoryProvider.scala:581)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, bug fix.
    
    ### How was this patch tested?
    
    Verification steps in ubuntu:20.04
    
    1. build `spark-3.3.0-SNAPSHOT-bin-master.tgz` on commit `34618a7ef6` using `dev/make-distribution.sh --tgz --name master`
    2. build `spark-3.3.0-SNAPSHOT-bin-SPARK-38411.tgz` on commit `2a8f56038b` using `dev/make-distribution.sh --tgz --name SPARK-38411`
    3. switch to UTF-8 using `export LC_ALL=C.UTF-8 && bash`
    4. generate event log contains no-ASCII chars.
        ```
        bin/spark-submit \
            --master local[*] \
            --class org.apache.spark.examples.SparkPi \
            --conf spark.eventLog.enabled=true \
            --conf spark.user.key='计算圆周率' \
            examples/jars/spark-examples_2.12-3.3.0-SNAPSHOT.jar
        ```
    5. switch to POSIX using `export LC_ALL=POSIX && bash`
    6. run `spark-3.3.0-SNAPSHOT-bin-master/sbin/start-history-server.sh` and watch logs
        <details>
    
        ```
        Spark Command: /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /spark-3.3.0-SNAPSHOT-bin-master/conf/:/spark-3.3.0-SNAPSHOT-bin-master/jars/* -Xmx1g org.apache.spark.deploy.history.HistoryServer
        ========================================
        Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
        22/03/06 13:37:19 INFO HistoryServer: Started daemon with process name: 48729c3ffc10aa9
        22/03/06 13:37:19 INFO SignalUtils: Registering signal handler for TERM
        22/03/06 13:37:19 INFO SignalUtils: Registering signal handler for HUP
        22/03/06 13:37:19 INFO SignalUtils: Registering signal handler for INT
        22/03/06 13:37:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
        22/03/06 13:37:21 INFO SecurityManager: Changing view acls to: root
        22/03/06 13:37:21 INFO SecurityManager: Changing modify acls to: root
        22/03/06 13:37:21 INFO SecurityManager: Changing view acls groups to:
        22/03/06 13:37:21 INFO SecurityManager: Changing modify acls groups to:
        22/03/06 13:37:21 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
        22/03/06 13:37:21 INFO FsHistoryProvider: History server ui acls disabled; users with admin permissions: ; groups with admin permissions:
        22/03/06 13:37:22 INFO Utils: Successfully started service 'HistoryServerUI' on port 18080.
        22/03/06 13:37:23 INFO HistoryServer: Bound HistoryServer to 0.0.0.0, and started at http://29c3ffc10aa9:18080
        22/03/06 13:37:23 INFO FsHistoryProvider: Parsing file:/tmp/spark-events/local-1646573251839 for listing data...
        22/03/06 13:37:25 ERROR FsHistoryProvider: Exception while merging application listings
        java.nio.charset.MalformedInputException: Input length = 1
    	    at java.nio.charset.CoderResult.throwException(CoderResult.java:281) ~[?:1.8.0_312]
    	    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) ~[?:1.8.0_312]
    	    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) ~[?:1.8.0_312]
    	    at java.io.InputStreamReader.read(InputStreamReader.java:184) ~[?:1.8.0_312]
    	    at java.io.BufferedReader.fill(BufferedReader.java:161) ~[?:1.8.0_312]
    	    at java.io.BufferedReader.readLine(BufferedReader.java:324) ~[?:1.8.0_312]
    	    at java.io.BufferedReader.readLine(BufferedReader.java:389) ~[?:1.8.0_312]
    	    at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74) ~[scala-library-2.12.15.jar:?]
    	    at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:886) ~[scala-library-2.12.15.jar:?]
    	    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) ~[scala-library-2.12.15.jar:?]
    	    at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:82) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
    	    at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListingInternal$4(FsHistoryProvider.scala:830) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
    	    at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListingInternal$4$adapted(FsHistoryProvider.scala:812) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
    	    at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2738) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
    	    at org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListingInternal(FsHistoryProvider.scala:812) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
    	    at org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListing(FsHistoryProvider.scala:758) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
    	    at org.apache.spark.deploy.history.FsHistoryProvider.mergeApplicationListing(FsHistoryProvider.scala:718) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
    	    at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$15(FsHistoryProvider.scala:584) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
    	    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_312]
    	    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_312]
    	    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_312]
    	    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_312]
    	    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
        ```
        </details>
    7. run `spark-3.3.0-SNAPSHOT-bin-master/sbin/stop-history-server.sh`
    8. run `spark-3.3.0-SNAPSHOT-bin-SPARK-38411/sbin/stop-history-server.sh` and watch logs
        <details>
    
        ```
        Spark Command: /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /spark-3.3.0-SNAPSHOT-bin-SPARK-38411/conf/:/spark-3.3.0-SNAPSHOT-bin-SPARK-38411/jars/* -Xmx1g org.apache.spark.deploy.history.HistoryServer
        ========================================
        Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
        22/03/06 13:30:54 INFO HistoryServer: Started daemon with process name: 34729c3ffc10aa9
        22/03/06 13:30:54 INFO SignalUtils: Registering signal handler for TERM
        22/03/06 13:30:54 INFO SignalUtils: Registering signal handler for HUP
        22/03/06 13:30:54 INFO SignalUtils: Registering signal handler for INT
        22/03/06 13:30:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
        22/03/06 13:30:56 INFO SecurityManager: Changing view acls to: root
        22/03/06 13:30:56 INFO SecurityManager: Changing modify acls to: root
        22/03/06 13:30:56 INFO SecurityManager: Changing view acls groups to:
        22/03/06 13:30:56 INFO SecurityManager: Changing modify acls groups to:
        22/03/06 13:30:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
        22/03/06 13:30:56 INFO FsHistoryProvider: History server ui acls disabled; users with admin permissions: ; groups with admin permissions:
        22/03/06 13:30:57 INFO Utils: Successfully started service 'HistoryServerUI' on port 18080.
        22/03/06 13:30:57 INFO HistoryServer: Bound HistoryServer to 0.0.0.0, and started at http://29c3ffc10aa9:18080
        22/03/06 13:30:57 INFO FsHistoryProvider: Parsing file:/tmp/spark-events/local-1646573251839 for listing data...
        22/03/06 13:30:59 INFO FsHistoryProvider: Finished parsing file:/tmp/spark-events/local-1646573251839
        ```
        </details>
    
    Closes #35730 from pan3793/SPARK-38411.
    
    Authored-by: Cheng Pan <ch...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 135841f257fbb008aef211a5e38222940849cb26)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../scala/org/apache/spark/deploy/history/FsHistoryProvider.scala     | 4 ++--
 core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala    | 4 ++--
 .../org/apache/spark/deploy/history/EventLogFileWritersSuite.scala    | 4 ++--
 .../scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala  | 4 ++--
 4 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index b31333f..bad81ae 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -26,7 +26,7 @@ import java.util.zip.ZipOutputStream
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
-import scala.io.Source
+import scala.io.{Codec, Source}
 import scala.util.control.NonFatal
 import scala.xml.Node
 
@@ -785,7 +785,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
           }
         }
 
-        val source = Source.fromInputStream(in).getLines()
+        val source = Source.fromInputStream(in)(Codec.UTF8).getLines()
 
         // Because skipping may leave the stream in the middle of a line, read the next line
         // before replaying.
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index b20d56c..1e3a203 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
 
 import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
+import scala.io.{Codec, Source}
 
 import com.google.common.io.ByteStreams
 import org.apache.commons.io.FileUtils
@@ -615,7 +615,7 @@ class SparkSubmitSuite
       runSparkSubmit(args)
       val listStatus = fileSystem.listStatus(testDirPath)
       val logData = EventLogFileReader.openEventLog(listStatus.last.getPath, fileSystem)
-      Source.fromInputStream(logData).getLines().foreach { line =>
+      Source.fromInputStream(logData)(Codec.UTF8).getLines().foreach { line =>
         assert(!line.contains("secret_password"))
       }
     }
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
index e9b739c..e557108 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
@@ -21,7 +21,7 @@ import java.io.{File, FileOutputStream, IOException}
 import java.net.URI
 
 import scala.collection.mutable
-import scala.io.Source
+import scala.io.{Codec, Source}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
@@ -120,7 +120,7 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon
   protected def readLinesFromEventLogFile(log: Path, fs: FileSystem): List[String] = {
     val logDataStream = EventLogFileReader.openEventLog(log, fs)
     try {
-      Source.fromInputStream(logDataStream).getLines().toList
+      Source.fromInputStream(logDataStream)(Codec.UTF8).getLines().toList
     } finally {
       logDataStream.close()
     }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 6dadb78..3f7ac62 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -23,7 +23,7 @@ import java.util.{Arrays, Properties}
 import scala.collection.immutable.Map
 import scala.collection.mutable
 import scala.collection.mutable.Set
-import scala.io.Source
+import scala.io.{Codec, Source}
 
 import org.apache.hadoop.fs.Path
 import org.json4s.jackson.JsonMethods._
@@ -651,7 +651,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
   }
 
   private def readLines(in: InputStream): Seq[String] = {
-    Source.fromInputStream(in).getLines().toSeq
+    Source.fromInputStream(in)(Codec.UTF8).getLines().toSeq
   }
 
   /**

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org