You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2020/10/19 06:18:22 UTC
[spark] 02/02: Revert "Revert "[SPARK-33146][CORE] Check for
non-fatal errors when loading new applications in SHS""
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
commit 02f80cf293739f4d2881316897dbdcea74daa0bc
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Thu Oct 15 15:28:52 2020 +0900
Revert "Revert "[SPARK-33146][CORE] Check for non-fatal errors when loading new applications in SHS""
This reverts commit e40c147a5d194adbba13f12590959dc68347ec14.
---
.../spark/deploy/history/FsHistoryProvider.scala | 3 ++
.../deploy/history/FsHistoryProviderSuite.scala | 49 ++++++++++++++++++++++
2 files changed, 52 insertions(+)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index f5e7c4fa..7e63d55 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -527,6 +527,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
reader.fileSizeForLastIndex > 0
} catch {
case _: FileNotFoundException => false
+ case NonFatal(e) =>
+ logWarning(s"Error while reading new log ${reader.rootPath}", e)
+ false
}
case NonFatal(e) =>
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index c2f34fc..f3beb35 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -1470,6 +1470,55 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}
}
+ test("SPARK-33146: don't let one bad rolling log folder prevent loading other applications") {
+ withTempDir { dir =>
+ val conf = createTestConf(true)
+ conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+ val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+ val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+ val provider = new FsHistoryProvider(conf)
+
+ val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, conf, hadoopConf)
+ writer.start()
+
+ writeEventsToRollingWriter(writer, Seq(
+ SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+ SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+ provider.checkForLogs()
+ provider.cleanLogs()
+ assert(dir.listFiles().size === 1)
+ assert(provider.getListing.length === 1)
+
+ // Manually delete the appstatus file to make an invalid rolling event log
+ val appStatusPath = RollingEventLogFilesWriter.getAppStatusFilePath(new Path(writer.logPath),
+ "app", None, true)
+ fs.delete(appStatusPath, false)
+ provider.checkForLogs()
+ provider.cleanLogs()
+ assert(provider.getListing.length === 0)
+
+ // Create a new application
+ val writer2 = new RollingEventLogFilesWriter("app2", None, dir.toURI, conf, hadoopConf)
+ writer2.start()
+ writeEventsToRollingWriter(writer2, Seq(
+ SparkListenerApplicationStart("app2", Some("app2"), 0, "user", None),
+ SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+
+ // Both folders exist but only one application found
+ provider.checkForLogs()
+ provider.cleanLogs()
+ assert(provider.getListing.length === 1)
+ assert(dir.listFiles().size === 2)
+
+ // Make sure a new provider sees the valid application
+ provider.stop()
+ val newProvider = new FsHistoryProvider(conf)
+ newProvider.checkForLogs()
+ assert(newProvider.getListing.length === 1)
+ }
+ }
+
/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org