You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2022/11/11 00:16:54 UTC

[spark] branch master updated: [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b5fbdeb5483 [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path
b5fbdeb5483 is described below

commit b5fbdeb5483fa4b3c6102a99fa84a0677e145c42
Author: Swaminathan Balachandran <sw...@gmail.com>
AuthorDate: Thu Nov 10 18:16:44 2022 -0600

    [SPARK-40901][CORE] Unable to store Spark Driver logs with Absolute Hadoop based URI FS Path
    
    ### What changes were proposed in this pull request?
    Parsing the configuration value of the config value of spark.driver.log.dfsDir using Hadoop FS Path & extracting path from the URI should fix the problem
    
    ### Why are the changes needed?
    Currently when one passes the Absolute URI to configured filesystem, the code would fail while trying to copy the local log file to the filesystem directory path.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit test cases
    
    Closes #38377 from swamirishi/SPARK-40901.
    
    Authored-by: Swaminathan Balachandran <sw...@gmail.com>
    Signed-off-by: Mridul <mridul<at>gmail.com>
---
 .../apache/spark/util/logging/DriverLogger.scala   |  8 ++---
 .../spark/util/logging/DriverLoggerSuite.scala     | 42 +++++++++++++++++++---
 2 files changed, 41 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala b/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala
index bb57e032563..4f56cf24a2f 100644
--- a/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/logging/DriverLogger.scala
@@ -126,13 +126,13 @@ private[spark] class DriverLogger(conf: SparkConf) extends Logging {
         throw new RuntimeException(s"${rootDir} does not exist." +
           s" Please create this dir in order to persist driver logs")
       }
-      val dfsLogFile: String = FileUtils.getFile(rootDir, appId
-        + DriverLogger.DRIVER_LOG_FILE_SUFFIX).getAbsolutePath()
+      val dfsLogFile: Path = fileSystem.makeQualified(new Path(rootDir, appId
+        + DriverLogger.DRIVER_LOG_FILE_SUFFIX))
       try {
         inStream = new BufferedInputStream(new FileInputStream(localLogFile))
-        outputStream = SparkHadoopUtil.createFile(fileSystem, new Path(dfsLogFile),
+        outputStream = SparkHadoopUtil.createFile(fileSystem, dfsLogFile,
           conf.get(DRIVER_LOG_ALLOW_EC))
-        fileSystem.setPermission(new Path(dfsLogFile), LOG_FILE_PERMISSIONS)
+        fileSystem.setPermission(dfsLogFile, LOG_FILE_PERMISSIONS)
       } catch {
         case e: Exception =>
           JavaUtils.closeQuietly(inStream)
diff --git a/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala b/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala
index bd7ec242a93..9599bd29188 100644
--- a/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/logging/DriverLoggerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.util.logging
 import java.io.File
 
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark._
 import org.apache.spark.{SparkContext, SparkFunSuite}
@@ -65,12 +66,43 @@ class DriverLoggerSuite extends SparkFunSuite with LocalSparkContext {
     assert(dfsFile.length() > 0)
   }
 
+  test("SPARK-40901: driver logs are persisted locally and synced to dfs when log " +
+    "dir is absolute URI") {
+    val sparkConf = new SparkConf()
+    sparkConf.set(DRIVER_LOG_DFS_DIR, "file://" + rootDfsDir.getAbsolutePath())
+    val sc = getSparkContext(sparkConf)
+    val app_id = sc.applicationId
+    // Run a simple spark application
+    sc.parallelize(1 to 1000).count()
+
+    // Assert driver log file exists
+    val rootDir = Utils.getLocalDir(sc.getConf)
+    val driverLogsDir = FileUtils.getFile(rootDir, DriverLogger.DRIVER_LOG_DIR)
+    assert(driverLogsDir.exists())
+    val files = driverLogsDir.listFiles()
+    assert(files.length === 1)
+    assert(files(0).getName.equals(DriverLogger.DRIVER_LOG_FILE))
+
+    sc.stop()
+    assert(!driverLogsDir.exists())
+    assert(sc.getConf.get(DRIVER_LOG_DFS_DIR).get.startsWith("file:///"))
+    val dfsFile = new Path(sc.getConf.get(DRIVER_LOG_DFS_DIR).get +
+      "/" + app_id + DriverLogger.DRIVER_LOG_FILE_SUFFIX)
+    val dfsFileStatus = dfsFile.getFileSystem(sc.hadoopConfiguration).getFileStatus(dfsFile)
+
+    assert(dfsFileStatus.isFile)
+    assert(dfsFileStatus.getLen > 0)
+  }
+
   private def getSparkContext(): SparkContext = {
-    val conf = new SparkConf()
-    conf.set(DRIVER_LOG_DFS_DIR, rootDfsDir.getAbsolutePath())
-    conf.set(DRIVER_LOG_PERSISTTODFS, true)
-    conf.set(SparkLauncher.SPARK_MASTER, "local")
-    conf.set(SparkLauncher.DEPLOY_MODE, "client")
+    getSparkContext(new SparkConf())
+  }
+
+  private def getSparkContext(conf: SparkConf): SparkContext = {
+    conf.setIfMissing(DRIVER_LOG_DFS_DIR, rootDfsDir.getAbsolutePath())
+    conf.setIfMissing(DRIVER_LOG_PERSISTTODFS, true)
+    conf.setIfMissing(SparkLauncher.SPARK_MASTER, "local")
+    conf.setIfMissing(SparkLauncher.DEPLOY_MODE, "client")
     sc = new SparkContext("local", "DriverLogTest", conf)
     sc
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org