You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2021/08/04 12:17:23 UTC
[spark] branch branch-3.1 updated: [SPARK-36354][CORE]
EventLogFileReader should skip rolling event log directories with no logs
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 6924e70 [SPARK-36354][CORE] EventLogFileReader should skip rolling event log directories with no logs
6924e70 is described below
commit 6924e70c430049975b2562c024526f8370cf4992
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Wed Aug 4 20:26:06 2021 +0900
[SPARK-36354][CORE] EventLogFileReader should skip rolling event log directories with no logs
### What changes were proposed in this pull request?
This PR aims to skip rolling event log directories which has only `appstatus` file.
### Why are the changes needed?
Currently, Spark History server shows `IllegalArgumentException` warning, but the event log might arrive later. The situation also can happen when the job is killed before uploading its first log to the remote storages like S3.
```
21/07/30 07:38:26 WARN FsHistoryProvider:
Error while reading new log s3a://.../eventlog_v2_spark-95b5c736c8e44037afcf152534d08771
java.lang.IllegalArgumentException: requirement failed:
Log directory must contain at least one event log file!
...
at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files$lzycompute(EventLogFileReaders.scala:216)
```
### Does this PR introduce _any_ user-facing change?
Yes. Users will not see `IllegalArgumentException` warnings.
### How was this patch tested?
Pass the CIs with the newly added test case.
Closes #33586 from dongjoon-hyun/SPARK-36354.
Authored-by: Dongjoon Hyun <dh...@apple.com>
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
(cherry picked from commit 28a2a2238fbaf4fad3c98cfef2b3049c1f4616c8)
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
.../spark/deploy/history/EventLogFileReaders.scala | 10 ++++++--
.../deploy/history/EventLogFileReadersSuite.scala | 2 ++
.../deploy/history/FsHistoryProviderSuite.scala | 29 ++++++++++++++++++++++
3 files changed, 39 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
index b4771c8..b21c67a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSInputStream
import org.apache.spark.SparkConf
import org.apache.spark.deploy.history.EventLogFileWriter.codecName
+import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.Utils
@@ -96,7 +97,7 @@ abstract class EventLogFileReader(
def totalSize: Long
}
-object EventLogFileReader {
+object EventLogFileReader extends Logging {
// A cache for compression codecs to avoid creating the same codec many times
private val codecMap = new ConcurrentHashMap[String, CompressionCodec]()
@@ -118,7 +119,12 @@ object EventLogFileReader {
if (isSingleEventLog(status)) {
Some(new SingleFileEventLogFileReader(fs, status.getPath, Option(status)))
} else if (isRollingEventLogs(status)) {
- Some(new RollingEventLogFilesFileReader(fs, status.getPath))
+ if (fs.listStatus(status.getPath).exists(RollingEventLogFilesWriter.isEventLogFile)) {
+ Some(new RollingEventLogFilesFileReader(fs, status.getPath))
+ } else {
+ logDebug(s"Rolling event log directory have no event log file at ${status.getPath}")
+ None
+ }
} else {
None
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
index 8eab2da..0bdc015 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
@@ -77,6 +77,8 @@ abstract class EventLogFileReadersSuite extends SparkFunSuite with LocalSparkCon
}
} else {
fileSystem.mkdirs(path)
+ fileSystem.create(getAppStatusFilePath(path, "app", None, true))
+ fileSystem.create(getEventLogFilePath(path, "app", None, 1, None))
}
val reader = EventLogFileReader(fileSystem, path)
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 3b86777..ca36651 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -1524,6 +1524,35 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}
}
+ test("SPARK-36354: EventLogFileReader should skip rolling event log directories with no logs") {
+ withTempDir { dir =>
+ val conf = createTestConf(true)
+ conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+ val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+ val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+ val provider = new FsHistoryProvider(conf)
+
+ val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, conf, hadoopConf)
+ writer.start()
+
+ writeEventsToRollingWriter(writer, Seq(
+ SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+ SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+ provider.checkForLogs()
+ provider.cleanLogs()
+ assert(dir.listFiles().size === 1)
+ assert(provider.getListing.length === 1)
+
+ // Manually delete event log files and create event log file reader
+ val eventLogDir = dir.listFiles().head
+ eventLogDir.listFiles
+ .filter(f => RollingEventLogFilesWriter.isEventLogFile(f.getName))
+ .foreach(f => f.delete())
+ EventLogFileReader(fs, new Path(eventLogDir.getAbsolutePath)).map(_.lastIndex)
+ }
+ }
+
test("SPARK-33215: check ui view permissions without retrieving ui") {
val conf = createTestConf()
.set(HISTORY_SERVER_UI_ACLS_ENABLE, true)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org