You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2016/08/17 18:43:22 UTC

spark git commit: [SPARK-16736][CORE][SQL] purge superfluous fs calls

Repository: spark
Updated Branches:
  refs/heads/master 4d92af310 -> cc97ea188


[SPARK-16736][CORE][SQL] purge superfluous fs calls

A review of the code, working back from Hadoop's `FileSystem.exists()` and `FileSystem.isDirectory()` code, then removing uses of the calls when superfluous.

1. delete is harmless if called on a nonexistent path, so don't do any checks before deletes
1. any `FileSystem.exists()`  check before `getFileStatus()` or `open()` is superfluous as the operation itself does the check. Instead the `FileNotFoundException` is caught and triggers the downgraded path. When a `FileNotFoundException` was thrown before, the code still creates a new FNFE with the error messages. Though now the inner exceptions are nested, for easier diagnostics.

Initially, relying on Jenkins test runs.

One troublespot here is that some of the codepaths are clearly error situations; it's not clear that they have coverage anyway. Trying to create the failure conditions in tests would be ideal, but it will also be hard.

Author: Steve Loughran <st...@apache.org>

Closes #14371 from steveloughran/cloud/SPARK-16736-superfluous-fs-calls.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc97ea18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc97ea18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc97ea18

Branch: refs/heads/master
Commit: cc97ea188e1d5b8e851d1a8438b8af092783ec04
Parents: 4d92af3
Author: Steve Loughran <st...@apache.org>
Authored: Wed Aug 17 11:42:57 2016 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Aug 17 11:43:01 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  3 --
 .../deploy/history/FsHistoryProvider.scala      | 27 ++++++++---------
 .../spark/rdd/ReliableCheckpointRDD.scala       | 31 ++++++++-----------
 .../spark/rdd/ReliableRDDCheckpointData.scala   |  7 +----
 .../spark/scheduler/EventLoggingListener.scala  | 13 ++------
 .../apache/spark/repl/ExecutorClassLoader.scala |  9 +++---
 .../state/HDFSBackedStateStoreProvider.scala    | 32 +++++++++++---------
 .../sql/hive/JavaMetastoreDataSourcesSuite.java |  4 +--
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  2 +-
 .../org/apache/spark/streaming/Checkpoint.scala | 17 +++++------
 .../streaming/util/FileBasedWriteAheadLog.scala | 27 ++++++++++++-----
 .../apache/spark/streaming/util/HdfsUtils.scala | 24 +++++++--------
 .../org/apache/spark/deploy/yarn/Client.scala   |  5 ++-
 13 files changed, 92 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cc97ea18/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a6853fe..60f042f 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1410,9 +1410,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     val scheme = new URI(schemeCorrectedPath).getScheme
     if (!Array("http", "https", "ftp").contains(scheme)) {
       val fs = hadoopPath.getFileSystem(hadoopConfiguration)
-      if (!fs.exists(hadoopPath)) {
-        throw new FileNotFoundException(s"Added file $hadoopPath does not exist.")
-      }
       val isDir = fs.getFileStatus(hadoopPath).isDirectory
       if (!isLocal && scheme == "file" && isDir) {
         throw new SparkException(s"addFile does not support local directories when not running " +

http://git-wip-us.apache.org/repos/asf/spark/blob/cc97ea18/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index bc09935..6874aa5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -193,16 +193,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   private def startPolling(): Unit = {
     // Validate the log directory.
     val path = new Path(logDir)
-    if (!fs.exists(path)) {
-      var msg = s"Log directory specified does not exist: $logDir"
-      if (logDir == DEFAULT_LOG_DIR) {
-        msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
+    try {
+      if (!fs.getFileStatus(path).isDirectory) {
+        throw new IllegalArgumentException(
+          "Logging directory specified is not a directory: %s".format(logDir))
       }
-      throw new IllegalArgumentException(msg)
-    }
-    if (!fs.getFileStatus(path).isDirectory) {
-      throw new IllegalArgumentException(
-        "Logging directory specified is not a directory: %s".format(logDir))
+    } catch {
+      case f: FileNotFoundException =>
+        var msg = s"Log directory specified does not exist: $logDir"
+        if (logDir == DEFAULT_LOG_DIR) {
+          msg += " Did you configure the correct one through spark.history.fs.logDirectory?"
+        }
+        throw new FileNotFoundException(msg).initCause(f)
     }
 
     // Disable the background thread during tests.
@@ -495,12 +497,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
       attemptsToClean.foreach { attempt =>
         try {
-          val path = new Path(logDir, attempt.logPath)
-          if (fs.exists(path)) {
-            if (!fs.delete(path, true)) {
-              logWarning(s"Error deleting ${path}")
-            }
-          }
+          fs.delete(new Path(logDir, attempt.logPath), true)
         } catch {
           case e: AccessControlException =>
             logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")

http://git-wip-us.apache.org/repos/asf/spark/blob/cc97ea18/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index fddb935..ab6554f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
 
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
@@ -166,9 +166,6 @@ private[spark] object ReliableCheckpointRDD extends Logging {
     val tempOutputPath =
       new Path(outputDir, s".$finalOutputName-attempt-${ctx.attemptNumber()}")
 
-    if (fs.exists(tempOutputPath)) {
-      throw new IOException(s"Checkpoint failed: temporary path $tempOutputPath already exists")
-    }
     val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
 
     val fileOutputStream = if (blockSize < 0) {
@@ -240,22 +237,20 @@ private[spark] object ReliableCheckpointRDD extends Logging {
       val bufferSize = sc.conf.getInt("spark.buffer.size", 65536)
       val partitionerFilePath = new Path(checkpointDirPath, checkpointPartitionerFileName)
       val fs = partitionerFilePath.getFileSystem(sc.hadoopConfiguration)
-      if (fs.exists(partitionerFilePath)) {
-        val fileInputStream = fs.open(partitionerFilePath, bufferSize)
-        val serializer = SparkEnv.get.serializer.newInstance()
-        val deserializeStream = serializer.deserializeStream(fileInputStream)
-        val partitioner = Utils.tryWithSafeFinally[Partitioner] {
-          deserializeStream.readObject[Partitioner]
-        } {
-          deserializeStream.close()
-        }
-        logDebug(s"Read partitioner from $partitionerFilePath")
-        Some(partitioner)
-      } else {
-        logDebug("No partitioner file")
-        None
+      val fileInputStream = fs.open(partitionerFilePath, bufferSize)
+      val serializer = SparkEnv.get.serializer.newInstance()
+      val deserializeStream = serializer.deserializeStream(fileInputStream)
+      val partitioner = Utils.tryWithSafeFinally[Partitioner] {
+        deserializeStream.readObject[Partitioner]
+      } {
+        deserializeStream.close()
       }
+      logDebug(s"Read partitioner from $partitionerFilePath")
+      Some(partitioner)
     } catch {
+      case e: FileNotFoundException =>
+        logDebug("No partitioner file", e)
+        None
       case NonFatal(e) =>
         logWarning(s"Error reading partitioner from $checkpointDirPath, " +
             s"partitioner will not be recovered which may lead to performance loss", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/cc97ea18/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
index 74f1876..b6d723c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableRDDCheckpointData.scala
@@ -80,12 +80,7 @@ private[spark] object ReliableRDDCheckpointData extends Logging {
   /** Clean up the files associated with the checkpoint data for this RDD. */
   def cleanCheckpoint(sc: SparkContext, rddId: Int): Unit = {
     checkpointPath(sc, rddId).foreach { path =>
-      val fs = path.getFileSystem(sc.hadoopConfiguration)
-      if (fs.exists(path)) {
-        if (!fs.delete(path, true)) {
-          logWarning(s"Error deleting ${path.toString()}")
-        }
-      }
+      path.getFileSystem(sc.hadoopConfiguration).delete(path, true)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/cc97ea18/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index a7d0639..ce78774 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -91,7 +91,7 @@ private[spark] class EventLoggingListener(
    */
   def start() {
     if (!fileSystem.getFileStatus(new Path(logBaseDir)).isDirectory) {
-      throw new IllegalArgumentException(s"Log directory $logBaseDir does not exist.")
+      throw new IllegalArgumentException(s"Log directory $logBaseDir is not a directory.")
     }
 
     val workingPath = logPath + IN_PROGRESS
@@ -100,11 +100,8 @@ private[spark] class EventLoggingListener(
     val defaultFs = FileSystem.getDefaultUri(hadoopConf).getScheme
     val isDefaultLocal = defaultFs == null || defaultFs == "file"
 
-    if (shouldOverwrite && fileSystem.exists(path)) {
+    if (shouldOverwrite && fileSystem.delete(path, true)) {
       logWarning(s"Event log $path already exists. Overwriting...")
-      if (!fileSystem.delete(path, true)) {
-        logWarning(s"Error deleting $path")
-      }
     }
 
     /* The Hadoop LocalFileSystem (r1.0.4) has known issues with syncing (HADOOP-7844).
@@ -301,12 +298,6 @@ private[spark] object EventLoggingListener extends Logging {
    * @return input stream that holds one JSON record per line.
    */
   def openEventLog(log: Path, fs: FileSystem): InputStream = {
-    // It's not clear whether FileSystem.open() throws FileNotFoundException or just plain
-    // IOException when a file does not exist, so try our best to throw a proper exception.
-    if (!fs.exists(log)) {
-      throw new FileNotFoundException(s"File $log does not exist.")
-    }
-
     val in = new BufferedInputStream(fs.open(log))
 
     // Compression codec is encoded as an extension, e.g. app_123.lzf

http://git-wip-us.apache.org/repos/asf/spark/blob/cc97ea18/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
----------------------------------------------------------------------
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 2f07395..df13b32 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.repl
 
-import java.io.{ByteArrayOutputStream, FilterInputStream, InputStream, IOException}
+import java.io.{ByteArrayOutputStream, FileNotFoundException, FilterInputStream, InputStream, IOException}
 import java.net.{HttpURLConnection, URI, URL, URLEncoder}
 import java.nio.channels.Channels
 
@@ -147,10 +147,11 @@ class ExecutorClassLoader(
   private def getClassFileInputStreamFromFileSystem(fileSystem: FileSystem)(
       pathInDirectory: String): InputStream = {
     val path = new Path(directory, pathInDirectory)
-    if (fileSystem.exists(path)) {
+    try {
       fileSystem.open(path)
-    } else {
-      throw new ClassNotFoundException(s"Class file not found at path $path")
+    } catch {
+      case _: FileNotFoundException =>
+        throw new ClassNotFoundException(s"Class file not found at path $path")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cc97ea18/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 3335755..bec966b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
-import java.io.{DataInputStream, DataOutputStream, IOException}
+import java.io.{DataInputStream, DataOutputStream, FileNotFoundException, IOException}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -171,7 +171,7 @@ private[state] class HDFSBackedStateStoreProvider(
       if (tempDeltaFileStream != null) {
         tempDeltaFileStream.close()
       }
-      if (tempDeltaFile != null && fs.exists(tempDeltaFile)) {
+      if (tempDeltaFile != null) {
         fs.delete(tempDeltaFile, true)
       }
       logInfo("Aborted")
@@ -278,14 +278,12 @@ private[state] class HDFSBackedStateStoreProvider(
 
   /** Initialize the store provider */
   private def initialize(): Unit = {
-    if (!fs.exists(baseDir)) {
+    try {
       fs.mkdirs(baseDir)
-    } else {
-      if (!fs.isDirectory(baseDir)) {
+    } catch {
+      case e: IOException =>
         throw new IllegalStateException(
-          s"Cannot use ${id.checkpointLocation} for storing state data for $this as " +
-            s"$baseDir already exists and is not a directory")
-      }
+          s"Cannot use ${id.checkpointLocation} for storing state data for $this: $e ", e)
     }
   }
 
@@ -340,13 +338,16 @@ private[state] class HDFSBackedStateStoreProvider(
 
   private def updateFromDeltaFile(version: Long, map: MapType): Unit = {
     val fileToRead = deltaFile(version)
-    if (!fs.exists(fileToRead)) {
-      throw new IllegalStateException(
-        s"Error reading delta file $fileToRead of $this: $fileToRead does not exist")
-    }
     var input: DataInputStream = null
+    val sourceStream = try {
+      fs.open(fileToRead)
+    } catch {
+      case f: FileNotFoundException =>
+        throw new IllegalStateException(
+          s"Error reading delta file $fileToRead of $this: $fileToRead does not exist", f)
+    }
     try {
-      input = decompressStream(fs.open(fileToRead))
+      input = decompressStream(sourceStream)
       var eof = false
 
       while(!eof) {
@@ -405,8 +406,6 @@ private[state] class HDFSBackedStateStoreProvider(
 
   private def readSnapshotFile(version: Long): Option[MapType] = {
     val fileToRead = snapshotFile(version)
-    if (!fs.exists(fileToRead)) return None
-
     val map = new MapType()
     var input: DataInputStream = null
 
@@ -443,6 +442,9 @@ private[state] class HDFSBackedStateStoreProvider(
       }
       logInfo(s"Read snapshot file for version $version of $this from $fileToRead")
       Some(map)
+    } catch {
+      case _: FileNotFoundException =>
+        None
     } finally {
       if (input != null) input.close()
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/cc97ea18/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index e73117c..061c743 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -75,9 +75,7 @@ public class JavaMetastoreDataSourcesSuite {
     hiveManagedPath = new Path(
       catalog.hiveDefaultTableFilePath(new TableIdentifier("javaSavedTable")));
     fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
-    if (fs.exists(hiveManagedPath)){
-      fs.delete(hiveManagedPath, true);
-    }
+    fs.delete(hiveManagedPath, true);
 
     List<String> jsonObjects = new ArrayList<>(10);
     for (int i = 0; i < 10; i++) {

http://git-wip-us.apache.org/repos/asf/spark/blob/cc97ea18/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c36b027..3892fe8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -375,7 +375,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
       val filesystemPath = new Path(expectedPath)
       val fs = filesystemPath.getFileSystem(spark.sessionState.newHadoopConf())
-      if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
+      fs.delete(filesystemPath, true)
 
       // It is a managed table when we do not specify the location.
       sql(

http://git-wip-us.apache.org/repos/asf/spark/blob/cc97ea18/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 398fa65..5cbad8b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -117,7 +117,7 @@ object Checkpoint extends Logging {
 
     val path = new Path(checkpointDir)
     val fs = fsOption.getOrElse(path.getFileSystem(SparkHadoopUtil.get.conf))
-    if (fs.exists(path)) {
+    try {
       val statuses = fs.listStatus(path)
       if (statuses != null) {
         val paths = statuses.map(_.getPath)
@@ -127,9 +127,10 @@ object Checkpoint extends Logging {
         logWarning(s"Listing $path returned null")
         Seq.empty
       }
-    } else {
-      logWarning(s"Checkpoint directory $path does not exist")
-      Seq.empty
+    } catch {
+      case _: FileNotFoundException =>
+        logWarning(s"Checkpoint directory $path does not exist")
+        Seq.empty
     }
   }
 
@@ -229,9 +230,7 @@ class CheckpointWriter(
           logInfo(s"Saving checkpoint for time $checkpointTime to file '$checkpointFile'")
 
           // Write checkpoint to temp file
-          if (fs.exists(tempFile)) {
-            fs.delete(tempFile, true)   // just in case it exists
-          }
+          fs.delete(tempFile, true) // just in case it exists
           val fos = fs.create(tempFile)
           Utils.tryWithSafeFinally {
             fos.write(bytes)
@@ -242,9 +241,7 @@ class CheckpointWriter(
           // If the checkpoint file exists, back it up
           // If the backup exists as well, just delete it, otherwise rename will fail
           if (fs.exists(checkpointFile)) {
-            if (fs.exists(backupFile)) {
-              fs.delete(backupFile, true) // just in case it exists
-            }
+            fs.delete(backupFile, true) // just in case it exists
             if (!fs.rename(checkpointFile, backupFile)) {
               logWarning(s"Could not rename $checkpointFile to $backupFile")
             }

http://git-wip-us.apache.org/repos/asf/spark/blob/cc97ea18/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 9b689f0..845f554 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.streaming.util
 
+import java.io.FileNotFoundException
 import java.nio.ByteBuffer
 import java.util.{Iterator => JIterator}
 import java.util.concurrent.RejectedExecutionException
@@ -231,13 +232,25 @@ private[streaming] class FileBasedWriteAheadLog(
     val logDirectoryPath = new Path(logDirectory)
     val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
 
-    if (fileSystem.exists(logDirectoryPath) &&
-        fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
-      val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })
-      pastLogs.clear()
-      pastLogs ++= logFileInfo
-      logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
-      logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+    try {
+      // If you call listStatus(file) it returns a stat of the file in the array,
+      // rather than an array listing all the children.
+      // This makes it hard to differentiate listStatus(file) and
+      // listStatus(dir-with-one-child) except by examining the name of the returned status,
+      // and once you've got symlinks in the mix that differentiation isn't easy.
+      // Checking for the path being a directory is one more call to the filesystem, but
+      // leads to much clearer code.
+      if (fileSystem.getFileStatus(logDirectoryPath).isDirectory) {
+        val logFileInfo = logFilesTologInfo(
+          fileSystem.listStatus(logDirectoryPath).map { _.getPath })
+        pastLogs.clear()
+        pastLogs ++= logFileInfo
+        logInfo(s"Recovered ${logFileInfo.size} write ahead log files from $logDirectory")
+        logDebug(s"Recovered files are:\n${logFileInfo.map(_.path).mkString("\n")}")
+      }
+    } catch {
+      case _: FileNotFoundException =>
+        // there is no log directory, hence nothing to recover
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cc97ea18/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
index 13a765d..6a3b320 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.streaming.util
 
-import java.io.IOException
+import java.io.{FileNotFoundException, IOException}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
@@ -44,18 +44,16 @@ private[streaming] object HdfsUtils {
   def getInputStream(path: String, conf: Configuration): FSDataInputStream = {
     val dfsPath = new Path(path)
     val dfs = getFileSystemForPath(dfsPath, conf)
-    if (dfs.isFile(dfsPath)) {
-      try {
-        dfs.open(dfsPath)
-      } catch {
-        case e: IOException =>
-          // If we are really unlucky, the file may be deleted as we're opening the stream.
-          // This can happen as clean up is performed by daemon threads that may be left over from
-          // previous runs.
-          if (!dfs.isFile(dfsPath)) null else throw e
-      }
-    } else {
-      null
+    try {
+      dfs.open(dfsPath)
+    } catch {
+      case _: FileNotFoundException =>
+        null
+      case e: IOException =>
+        // If we are really unlucky, the file may be deleted as we're opening the stream.
+        // This can happen as clean up is performed by daemon threads that may be left over from
+        // previous runs.
+        if (!dfs.isFile(dfsPath)) null else throw e
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/cc97ea18/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index e3572d7..9368400 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -189,9 +189,8 @@ private[spark] class Client(
     try {
       val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
       val fs = stagingDirPath.getFileSystem(hadoopConf)
-      if (!preserveFiles && fs.exists(stagingDirPath)) {
-        logInfo("Deleting staging directory " + stagingDirPath)
-        fs.delete(stagingDirPath, true)
+      if (!preserveFiles && fs.delete(stagingDirPath, true)) {
+        logInfo(s"Deleted staging directory $stagingDirPath")
       }
     } catch {
       case ioe: IOException =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org