You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2020/10/19 06:18:22 UTC

[spark] 02/02: Revert "Revert "[SPARK-33146][CORE] Check for non-fatal errors when loading new applications in SHS""

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git

commit 02f80cf293739f4d2881316897dbdcea74daa0bc
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Thu Oct 15 15:28:52 2020 +0900

    Revert "Revert "[SPARK-33146][CORE] Check for non-fatal errors when loading new applications in SHS""
    
    This reverts commit e40c147a5d194adbba13f12590959dc68347ec14.
---
 .../spark/deploy/history/FsHistoryProvider.scala   |  3 ++
 .../deploy/history/FsHistoryProviderSuite.scala    | 49 ++++++++++++++++++++++
 2 files changed, 52 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index f5e7c4fa..7e63d55 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -527,6 +527,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
                 reader.fileSizeForLastIndex > 0
               } catch {
                 case _: FileNotFoundException => false
+                case NonFatal(e) =>
+                  logWarning(s"Error while reading new log ${reader.rootPath}", e)
+                  false
               }
 
             case NonFatal(e) =>
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index c2f34fc..f3beb35 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -1470,6 +1470,55 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
     }
   }
 
+  test("SPARK-33146: don't let one bad rolling log folder prevent loading other applications") {
+    withTempDir { dir =>
+      val conf = createTestConf(true)
+      conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+      val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+      val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+      val provider = new FsHistoryProvider(conf)
+
+      val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, conf, hadoopConf)
+      writer.start()
+
+      writeEventsToRollingWriter(writer, Seq(
+        SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+        SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+      provider.checkForLogs()
+      provider.cleanLogs()
+      assert(dir.listFiles().size === 1)
+      assert(provider.getListing.length === 1)
+
+      // Manually delete the appstatus file to make an invalid rolling event log
+      val appStatusPath = RollingEventLogFilesWriter.getAppStatusFilePath(new Path(writer.logPath),
+        "app", None, true)
+      fs.delete(appStatusPath, false)
+      provider.checkForLogs()
+      provider.cleanLogs()
+      assert(provider.getListing.length === 0)
+
+      // Create a new application
+      val writer2 = new RollingEventLogFilesWriter("app2", None, dir.toURI, conf, hadoopConf)
+      writer2.start()
+      writeEventsToRollingWriter(writer2, Seq(
+        SparkListenerApplicationStart("app2", Some("app2"), 0, "user", None),
+        SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+
+      // Both folders exist but only one application found
+      provider.checkForLogs()
+      provider.cleanLogs()
+      assert(provider.getListing.length === 1)
+      assert(dir.listFiles().size === 2)
+
+      // Make sure a new provider sees the valid application
+      provider.stop()
+      val newProvider = new FsHistoryProvider(conf)
+      newProvider.checkForLogs()
+      assert(newProvider.getListing.length === 1)
+    }
+  }
+
   /**
    * Asks the provider to check for logs and calls a function to perform checks on the updated
    * app list. Example:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org