You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/03/06 23:49:40 UTC
[spark] branch branch-3.0 updated: [SPARK-38411][CORE] Use `UTF-8` when `doMergeApplicationListingInternal` reads event logs
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e036de3 [SPARK-38411][CORE] Use `UTF-8` when `doMergeApplicationListingInternal` reads event logs
e036de3 is described below
commit e036de326bdc6bc828eee910861851d52c81f6d5
Author: Cheng Pan <ch...@apache.org>
AuthorDate: Sun Mar 6 15:41:20 2022 -0800
[SPARK-38411][CORE] Use `UTF-8` when `doMergeApplicationListingInternal` reads event logs
### What changes were proposed in this pull request?
Use UTF-8 instead of system default encoding to read event log
### Why are the changes needed?
After SPARK-29160, we should always use UTF-8 to read event log, otherwise, if Spark History Server run with different default charset than "UTF-8", will encounter such error.
```
2022-03-04 12:16:00,143 [3752440] - INFO [log-replay-executor-19:Logging57] - Parsing hdfs://hz-cluster11/spark2-history/application_1640597251469_2453817_1.lz4 for listing data...
2022-03-04 12:16:00,145 [3752442] - ERROR [log-replay-executor-18:Logging94] - Exception while merging application listings
java.nio.charset.MalformedInputException: Input length = 1
at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74)
at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:884)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:511)
at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:82)
at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListing$4(FsHistoryProvider.scala:819)
at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListing$4$adapted(FsHistoryProvider.scala:801)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2626)
at org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListing(FsHistoryProvider.scala:801)
at org.apache.spark.deploy.history.FsHistoryProvider.mergeApplicationListing(FsHistoryProvider.scala:715)
at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$15(FsHistoryProvider.scala:581)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
### Does this PR introduce _any_ user-facing change?
Yes, bug fix.
### How was this patch tested?
Verification steps in ubuntu:20.04
1. build `spark-3.3.0-SNAPSHOT-bin-master.tgz` on commit `34618a7ef6` using `dev/make-distribution.sh --tgz --name master`
2. build `spark-3.3.0-SNAPSHOT-bin-SPARK-38411.tgz` on commit `2a8f56038b` using `dev/make-distribution.sh --tgz --name SPARK-38411`
3. switch to UTF-8 using `export LC_ALL=C.UTF-8 && bash`
4. generate event log contains no-ASCII chars.
```
bin/spark-submit \
--master local[*] \
--class org.apache.spark.examples.SparkPi \
--conf spark.eventLog.enabled=true \
--conf spark.user.key='计算圆周率' \
examples/jars/spark-examples_2.12-3.3.0-SNAPSHOT.jar
```
5. switch to POSIX using `export LC_ALL=POSIX && bash`
6. run `spark-3.3.0-SNAPSHOT-bin-master/sbin/start-history-server.sh` and watch logs
<details>
```
Spark Command: /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /spark-3.3.0-SNAPSHOT-bin-master/conf/:/spark-3.3.0-SNAPSHOT-bin-master/jars/* -Xmx1g org.apache.spark.deploy.history.HistoryServer
========================================
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
22/03/06 13:37:19 INFO HistoryServer: Started daemon with process name: 48729c3ffc10aa9
22/03/06 13:37:19 INFO SignalUtils: Registering signal handler for TERM
22/03/06 13:37:19 INFO SignalUtils: Registering signal handler for HUP
22/03/06 13:37:19 INFO SignalUtils: Registering signal handler for INT
22/03/06 13:37:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/06 13:37:21 INFO SecurityManager: Changing view acls to: root
22/03/06 13:37:21 INFO SecurityManager: Changing modify acls to: root
22/03/06 13:37:21 INFO SecurityManager: Changing view acls groups to:
22/03/06 13:37:21 INFO SecurityManager: Changing modify acls groups to:
22/03/06 13:37:21 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
22/03/06 13:37:21 INFO FsHistoryProvider: History server ui acls disabled; users with admin permissions: ; groups with admin permissions:
22/03/06 13:37:22 INFO Utils: Successfully started service 'HistoryServerUI' on port 18080.
22/03/06 13:37:23 INFO HistoryServer: Bound HistoryServer to 0.0.0.0, and started at http://29c3ffc10aa9:18080
22/03/06 13:37:23 INFO FsHistoryProvider: Parsing file:/tmp/spark-events/local-1646573251839 for listing data...
22/03/06 13:37:25 ERROR FsHistoryProvider: Exception while merging application listings
java.nio.charset.MalformedInputException: Input length = 1
at java.nio.charset.CoderResult.throwException(CoderResult.java:281) ~[?:1.8.0_312]
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) ~[?:1.8.0_312]
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) ~[?:1.8.0_312]
at java.io.InputStreamReader.read(InputStreamReader.java:184) ~[?:1.8.0_312]
at java.io.BufferedReader.fill(BufferedReader.java:161) ~[?:1.8.0_312]
at java.io.BufferedReader.readLine(BufferedReader.java:324) ~[?:1.8.0_312]
at java.io.BufferedReader.readLine(BufferedReader.java:389) ~[?:1.8.0_312]
at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator$$anon$20.hasNext(Iterator.scala:886) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:82) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListingInternal$4(FsHistoryProvider.scala:830) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$doMergeApplicationListingInternal$4$adapted(FsHistoryProvider.scala:812) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:2738) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListingInternal(FsHistoryProvider.scala:812) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.deploy.history.FsHistoryProvider.doMergeApplicationListing(FsHistoryProvider.scala:758) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.deploy.history.FsHistoryProvider.mergeApplicationListing(FsHistoryProvider.scala:718) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at org.apache.spark.deploy.history.FsHistoryProvider.$anonfun$checkForLogs$15(FsHistoryProvider.scala:584) ~[spark-core_2.12-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_312]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_312]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_312]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_312]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_312]
```
</details>
7. run `spark-3.3.0-SNAPSHOT-bin-master/sbin/stop-history-server.sh`
8. run `spark-3.3.0-SNAPSHOT-bin-SPARK-38411/sbin/stop-history-server.sh` and watch logs
<details>
```
Spark Command: /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java -cp /spark-3.3.0-SNAPSHOT-bin-SPARK-38411/conf/:/spark-3.3.0-SNAPSHOT-bin-SPARK-38411/jars/* -Xmx1g org.apache.spark.deploy.history.HistoryServer
========================================
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
22/03/06 13:30:54 INFO HistoryServer: Started daemon with process name: 34729c3ffc10aa9
22/03/06 13:30:54 INFO SignalUtils: Registering signal handler for TERM
22/03/06 13:30:54 INFO SignalUtils: Registering signal handler for HUP
22/03/06 13:30:54 INFO SignalUtils: Registering signal handler for INT
22/03/06 13:30:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/06 13:30:56 INFO SecurityManager: Changing view acls to: root
22/03/06 13:30:56 INFO SecurityManager: Changing modify acls to: root
22/03/06 13:30:56 INFO SecurityManager: Changing view acls groups to:
22/03/06 13:30:56 INFO SecurityManager: Changing modify acls groups to:
22/03/06 13:30:56 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
22/03/06 13:30:56 INFO FsHistoryProvider: History server ui acls disabled; users with admin permissions: ; groups with admin permissions:
22/03/06 13:30:57 INFO Utils: Successfully started service 'HistoryServerUI' on port 18080.
22/03/06 13:30:57 INFO HistoryServer: Bound HistoryServer to 0.0.0.0, and started at http://29c3ffc10aa9:18080
22/03/06 13:30:57 INFO FsHistoryProvider: Parsing file:/tmp/spark-events/local-1646573251839 for listing data...
22/03/06 13:30:59 INFO FsHistoryProvider: Finished parsing file:/tmp/spark-events/local-1646573251839
```
</details>
Closes #35730 from pan3793/SPARK-38411.
Authored-by: Cheng Pan <ch...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 135841f257fbb008aef211a5e38222940849cb26)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../scala/org/apache/spark/deploy/history/FsHistoryProvider.scala | 4 ++--
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 4 ++--
.../org/apache/spark/deploy/history/EventLogFileWritersSuite.scala | 4 ++--
.../scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala | 4 ++--
4 files changed, 8 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index b31333f..bad81ae 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -26,7 +26,7 @@ import java.util.zip.ZipOutputStream
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.io.Source
+import scala.io.{Codec, Source}
import scala.util.control.NonFatal
import scala.xml.Node
@@ -785,7 +785,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}
- val source = Source.fromInputStream(in).getLines()
+ val source = Source.fromInputStream(in)(Codec.UTF8).getLines()
// Because skipping may leave the stream in the middle of a line, read the next line
// before replaying.
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index b20d56c..1e3a203 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import scala.collection.mutable.ArrayBuffer
-import scala.io.Source
+import scala.io.{Codec, Source}
import com.google.common.io.ByteStreams
import org.apache.commons.io.FileUtils
@@ -615,7 +615,7 @@ class SparkSubmitSuite
runSparkSubmit(args)
val listStatus = fileSystem.listStatus(testDirPath)
val logData = EventLogFileReader.openEventLog(listStatus.last.getPath, fileSystem)
- Source.fromInputStream(logData).getLines().foreach { line =>
+ Source.fromInputStream(logData)(Codec.UTF8).getLines().foreach { line =>
assert(!line.contains("secret_password"))
}
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
index e9b739c..e557108 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileWritersSuite.scala
@@ -21,7 +21,7 @@ import java.io.{File, FileOutputStream, IOException}
import java.net.URI
import scala.collection.mutable
-import scala.io.Source
+import scala.io.{Codec, Source}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
@@ -120,7 +120,7 @@ abstract class EventLogFileWritersSuite extends SparkFunSuite with LocalSparkCon
protected def readLinesFromEventLogFile(log: Path, fs: FileSystem): List[String] = {
val logDataStream = EventLogFileReader.openEventLog(log, fs)
try {
- Source.fromInputStream(logDataStream).getLines().toList
+ Source.fromInputStream(logDataStream)(Codec.UTF8).getLines().toList
} finally {
logDataStream.close()
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 6dadb78..3f7ac62 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -23,7 +23,7 @@ import java.util.{Arrays, Properties}
import scala.collection.immutable.Map
import scala.collection.mutable
import scala.collection.mutable.Set
-import scala.io.Source
+import scala.io.{Codec, Source}
import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
@@ -651,7 +651,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
}
private def readLines(in: InputStream): Seq[String] = {
- Source.fromInputStream(in).getLines().toSeq
+ Source.fromInputStream(in)(Codec.UTF8).getLines().toSeq
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org