You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2021/08/04 12:17:23 UTC

[spark] branch branch-3.1 updated: [SPARK-36354][CORE] EventLogFileReader should skip rolling event log directories with no logs

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 6924e70  [SPARK-36354][CORE] EventLogFileReader should skip rolling event log directories with no logs
6924e70 is described below

commit 6924e70c430049975b2562c024526f8370cf4992
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Wed Aug 4 20:26:06 2021 +0900

    [SPARK-36354][CORE] EventLogFileReader should skip rolling event log directories with no logs
    
    ### What changes were proposed in this pull request?
    
    This PR aims to skip rolling event log directories which has only `appstatus` file.
    
    ### Why are the changes needed?
    
    Currently, Spark History server shows `IllegalArgumentException` warning, but the event log might arrive later. The situation also can happen when the job is killed before uploading its first log to the remote storages like S3.
    ```
    21/07/30 07:38:26 WARN FsHistoryProvider:
    Error while reading new log s3a://.../eventlog_v2_spark-95b5c736c8e44037afcf152534d08771
    java.lang.IllegalArgumentException: requirement failed:
    Log directory must contain at least one event log file!
    ...
    at org.apache.spark.deploy.history.RollingEventLogFilesFileReader.files$lzycompute(EventLogFileReaders.scala:216)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Users will not see `IllegalArgumentException` warnings.
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    Closes #33586 from dongjoon-hyun/SPARK-36354.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
    (cherry picked from commit 28a2a2238fbaf4fad3c98cfef2b3049c1f4616c8)
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../spark/deploy/history/EventLogFileReaders.scala | 10 ++++++--
 .../deploy/history/EventLogFileReadersSuite.scala  |  2 ++
 .../deploy/history/FsHistoryProviderSuite.scala    | 29 ++++++++++++++++++++++
 3 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
index b4771c8..b21c67a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSInputStream
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.history.EventLogFileWriter.codecName
+import org.apache.spark.internal.Logging
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.util.Utils
 
@@ -96,7 +97,7 @@ abstract class EventLogFileReader(
   def totalSize: Long
 }
 
-object EventLogFileReader {
+object EventLogFileReader extends Logging {
   // A cache for compression codecs to avoid creating the same codec many times
   private val codecMap = new ConcurrentHashMap[String, CompressionCodec]()
 
@@ -118,7 +119,12 @@ object EventLogFileReader {
     if (isSingleEventLog(status)) {
       Some(new SingleFileEventLogFileReader(fs, status.getPath, Option(status)))
     } else if (isRollingEventLogs(status)) {
-      Some(new RollingEventLogFilesFileReader(fs, status.getPath))
+      if (fs.listStatus(status.getPath).exists(RollingEventLogFilesWriter.isEventLogFile)) {
+        Some(new RollingEventLogFilesFileReader(fs, status.getPath))
+      } else {
+        logDebug(s"Rolling event log directory have no event log file at ${status.getPath}")
+        None
+      }
     } else {
       None
     }
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
index 8eab2da..0bdc015 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
@@ -77,6 +77,8 @@ abstract class EventLogFileReadersSuite extends SparkFunSuite with LocalSparkCon
         }
       } else {
         fileSystem.mkdirs(path)
+        fileSystem.create(getAppStatusFilePath(path, "app", None, true))
+        fileSystem.create(getEventLogFilePath(path, "app", None, 1, None))
       }
 
       val reader = EventLogFileReader(fileSystem, path)
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 3b86777..ca36651 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -1524,6 +1524,35 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
     }
   }
 
+  test("SPARK-36354: EventLogFileReader should skip rolling event log directories with no logs") {
+    withTempDir { dir =>
+      val conf = createTestConf(true)
+      conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+      val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+      val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+      val provider = new FsHistoryProvider(conf)
+
+      val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, conf, hadoopConf)
+      writer.start()
+
+      writeEventsToRollingWriter(writer, Seq(
+        SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+        SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+      provider.checkForLogs()
+      provider.cleanLogs()
+      assert(dir.listFiles().size === 1)
+      assert(provider.getListing.length === 1)
+
+      // Manually delete event log files and create event log file reader
+      val eventLogDir = dir.listFiles().head
+      eventLogDir.listFiles
+        .filter(f => RollingEventLogFilesWriter.isEventLogFile(f.getName))
+        .foreach(f => f.delete())
+      EventLogFileReader(fs, new Path(eventLogDir.getAbsolutePath)).map(_.lastIndex)
+    }
+  }
+
   test("SPARK-33215: check ui view permissions without retrieving ui") {
     val conf = createTestConf()
       .set(HISTORY_SERVER_UI_ACLS_ENABLE, true)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org