You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/09/22 00:12:56 UTC

spark git commit: [SPARK-17569] Make StructuredStreaming FileStreamSource batch generation faster

Repository: spark
Updated Branches:
  refs/heads/master 8c3ee2bc4 -> 7cbe21644


[SPARK-17569] Make StructuredStreaming FileStreamSource batch generation faster

## What changes were proposed in this pull request?

While getting the batch for a `FileStreamSource` in StructuredStreaming, we know which files we must take specifically. We already have verified that they exist, and have committed them to a metadata log. When creating the FileSourceRelation however for an incremental execution, the code checks the existence of every single file once again!

When you have 100,000s of files in a folder, creating the first batch takes 2 hours+ when working with S3! This PR disables that check

## How was this patch tested?

Added a unit test to `FileStreamSource`.

Author: Burak Yavuz <br...@gmail.com>

Closes #15122 from brkyvz/SPARK-17569.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7cbe2164
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7cbe2164
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7cbe2164

Branch: refs/heads/master
Commit: 7cbe2164499e83b6c009fdbab0fbfffe89a2ecc0
Parents: 8c3ee2b
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Sep 21 17:12:52 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Sep 21 17:12:52 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 10 +++-
 .../execution/streaming/FileStreamSource.scala  |  3 +-
 .../streaming/FileStreamSourceSuite.scala       | 53 +++++++++++++++++++-
 3 files changed, 62 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7cbe2164/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 93154bd..413976a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -316,8 +316,14 @@ case class DataSource(
   /**
    * Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
    * [[DataSource]]
+   *
+   * @param checkFilesExist Whether to confirm that the files exist when generating the
+   *                        non-streaming file based datasource. StructuredStreaming jobs already
+   *                        list file existence, and when generating incremental jobs, the batch
+   *                        is considered as a non-streaming file based data source. Since we know
+   *                        that files already exist, we don't need to check them again.
    */
-  def resolveRelation(): BaseRelation = {
+  def resolveRelation(checkFilesExist: Boolean = true): BaseRelation = {
     val caseInsensitiveOptions = new CaseInsensitiveMap(options)
     val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
       // TODO: Throw when too much is given.
@@ -368,7 +374,7 @@ case class DataSource(
             throw new AnalysisException(s"Path does not exist: $qualified")
           }
           // Sufficient to check head of the globPath seq for non-glob scenario
-          if (!fs.exists(globPath.head)) {
+          if (checkFilesExist && !fs.exists(globPath.head)) {
             throw new AnalysisException(s"Path does not exist: ${globPath.head}")
           }
           globPath

http://git-wip-us.apache.org/repos/asf/spark/blob/7cbe2164/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 0dc08b1..5ebc083 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -133,7 +133,8 @@ class FileStreamSource(
         userSpecifiedSchema = Some(schema),
         className = fileFormatClassName,
         options = sourceOptions.optionMapWithoutPath)
-    Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
+    Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
+      checkFilesExist = false)))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7cbe2164/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
index c6db2fd..e8fa6a5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSourceSuite.scala
@@ -17,9 +17,19 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.File
+import java.net.URI
+
+import scala.util.Random
+
+import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem}
+
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.streaming.ExistsThrowsExceptionFileSystem._
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
 
-class FileStreamSourceSuite extends SparkFunSuite {
+class FileStreamSourceSuite extends SparkFunSuite with SharedSQLContext {
 
   import FileStreamSource._
 
@@ -73,4 +83,45 @@ class FileStreamSourceSuite extends SparkFunSuite {
     assert(map.isNewFile(FileEntry("b", 10)))
   }
 
+  testWithUninterruptibleThread("do not recheck that files exist during getBatch") {
+    withTempDir { temp =>
+      spark.conf.set(
+        s"fs.$scheme.impl",
+        classOf[ExistsThrowsExceptionFileSystem].getName)
+      // add the metadata entries as a pre-req
+      val dir = new File(temp, "dir") // use non-existent directory to test whether log make the dir
+      val metadataLog =
+        new FileStreamSourceLog(FileStreamSourceLog.VERSION, spark, dir.getAbsolutePath)
+      assert(metadataLog.add(0, Array(FileEntry(s"$scheme:///file1", 100L))))
+
+      val newSource = new FileStreamSource(spark, s"$scheme:///", "parquet", StructType(Nil),
+        dir.getAbsolutePath, Map.empty)
+      // this method should throw an exception if `fs.exists` is called during resolveRelation
+      newSource.getBatch(None, LongOffset(1))
+    }
+  }
+}
+
+/** Fake FileSystem to test whether the method `fs.exists` is called during
+ * `DataSource.resolveRelation`.
+ */
+class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
+  override def getUri: URI = {
+    URI.create(s"$scheme:///")
+  }
+
+  override def exists(f: Path): Boolean = {
+    throw new IllegalArgumentException("Exists shouldn't have been called!")
+  }
+
+  /** Simply return an empty file for now. */
+  override def listStatus(file: Path): Array[FileStatus] = {
+    val emptyFile = new FileStatus()
+    emptyFile.setPath(file)
+    Array(emptyFile)
+  }
+}
+
+object ExistsThrowsExceptionFileSystem {
+  val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org