You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/03/09 19:02:46 UTC

spark git commit: [SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSource

Repository: spark
Updated Branches:
  refs/heads/master 3232e54f2 -> 40da4d181


[SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSource

## What changes were proposed in this pull request?

Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this would cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing `s3n` to `s3a`).

This patch adds an option `fileNameOnly` that causes the new file check to be based only on the filename (but still store the whole path in the log).

## Usage

```scala
spark
  .readStream
  .option("fileNameOnly", true)
  .text("s3n://bucket/dir1/dir2")
  .writeStream
  ...
```
## How was this patch tested?

Added a test case

Author: Liwei Lin <lw...@gmail.com>

Closes #17120 from lw-lin/filename-only.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40da4d18
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40da4d18
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40da4d18

Branch: refs/heads/master
Commit: 40da4d181d648308de85fdcabc5c098ee861949a
Parents: 3232e54
Author: Liwei Lin <lw...@gmail.com>
Authored: Thu Mar 9 11:02:44 2017 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Thu Mar 9 11:02:44 2017 -0800

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md  | 12 +++++--
 .../execution/streaming/FileStreamOptions.scala | 34 ++++++++++++++------
 .../execution/streaming/FileStreamSource.scala  | 25 +++++++++-----
 .../sql/streaming/FileStreamSourceSuite.scala   | 22 +++++++++++--
 4 files changed, 72 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/40da4d18/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 6af47b6..995ac77 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1052,10 +1052,18 @@ Here are the details of all the sinks in Spark.
     <td>Append</td>
     <td>
         <code>path</code>: path to the output directory, must be specified.
+        <br/>
         <code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max)
         <br/>
-        <code>latestFirst</code>: whether to processs the latest new files first, useful when there is a large backlog of files(default: false)
-        <br/><br/>
+        <code>latestFirst</code>: whether to processs the latest new files first, useful when there is a large backlog of files (default: false)
+        <br/>
+        <code>fileNameOnly</code>: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
+        <br/>
+        � "file:///dataset.txt"<br/>
+        � "s3://a/dataset.txt"<br/>
+        � "s3n://a/b/dataset.txt"<br/>
+        � "s3a://a/b/c/dataset.txt"<br/>
+        <br/>
         For file-format-specific options, see the related methods in DataFrameWriter
         (<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>).
         E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code>

http://git-wip-us.apache.org/repos/asf/spark/blob/40da4d18/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index e7ba901..d54ed44 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -61,13 +61,29 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
    * Whether to scan latest files first. If it's true, when the source finds unprocessed files in a
    * trigger, it will first process the latest files.
    */
-  val latestFirst: Boolean = parameters.get("latestFirst").map { str =>
-    try {
-      str.toBoolean
-    } catch {
-      case _: IllegalArgumentException =>
-        throw new IllegalArgumentException(
-          s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'")
-    }
-  }.getOrElse(false)
+  val latestFirst: Boolean = withBooleanParameter("latestFirst", false)
+
+  /**
+   * Whether to check new files based on only the filename instead of on the full path.
+   *
+   * With this set to `true`, the following files would be considered as the same file, because
+   * their filenames, "dataset.txt", are the same:
+   * - "file:///dataset.txt"
+   * - "s3://a/dataset.txt"
+   * - "s3n://a/b/dataset.txt"
+   * - "s3a://a/b/c/dataset.txt"
+   */
+  val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)
+
+  private def withBooleanParameter(name: String, default: Boolean) = {
+    parameters.get(name).map { str =>
+      try {
+        str.toBoolean
+      } catch {
+        case _: IllegalArgumentException =>
+          throw new IllegalArgumentException(
+            s"Invalid value '$str' for option '$name', must be 'true' or 'false'")
+      }
+    }.getOrElse(default)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/40da4d18/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 0f09b0a..411a15f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.net.URI
+
 import scala.collection.JavaConverters._
 
 import org.apache.hadoop.fs.{FileStatus, Path}
@@ -79,9 +81,16 @@ class FileStreamSource(
     sourceOptions.maxFileAgeMs
   }
 
+  private val fileNameOnly = sourceOptions.fileNameOnly
+  if (fileNameOnly) {
+    logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " +
+      "UUID), otherwise, files with the same name but under different paths will be considered " +
+      "the same and causes data lost.")
+  }
+
   /** A mapping from a file that we have processed to some timestamp it was last modified. */
   // Visible for testing and debugging in production.
-  val seenFiles = new SeenFilesMap(maxFileAgeMs)
+  val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)
 
   metadataLog.allFiles().foreach { entry =>
     seenFiles.add(entry.path, entry.timestamp)
@@ -268,7 +277,7 @@ object FileStreamSource {
    * To prevent the hash map from growing indefinitely, a purge function is available to
    * remove files "maxAgeMs" older than the latest file.
    */
-  class SeenFilesMap(maxAgeMs: Long) {
+  class SeenFilesMap(maxAgeMs: Long, fileNameOnly: Boolean) {
     require(maxAgeMs >= 0)
 
     /** Mapping from file to its timestamp. */
@@ -280,9 +289,13 @@ object FileStreamSource {
     /** Timestamp for the last purge operation. */
     private var lastPurgeTimestamp: Timestamp = 0L
 
+    @inline private def stripPathIfNecessary(path: String) = {
+      if (fileNameOnly) new Path(new URI(path)).getName else path
+    }
+
     /** Add a new file to the map. */
     def add(path: String, timestamp: Timestamp): Unit = {
-      map.put(path, timestamp)
+      map.put(stripPathIfNecessary(path), timestamp)
       if (timestamp > latestTimestamp) {
         latestTimestamp = timestamp
       }
@@ -295,7 +308,7 @@ object FileStreamSource {
     def isNewFile(path: String, timestamp: Timestamp): Boolean = {
       // Note that we are testing against lastPurgeTimestamp here so we'd never miss a file that
       // is older than (latestTimestamp - maxAgeMs) but has not been purged yet.
-      timestamp >= lastPurgeTimestamp && !map.containsKey(path)
+      timestamp >= lastPurgeTimestamp && !map.containsKey(stripPathIfNecessary(path))
     }
 
     /** Removes aged entries and returns the number of files removed. */
@@ -314,9 +327,5 @@ object FileStreamSource {
     }
 
     def size: Int = map.size()
-
-    def allEntries: Seq[(String, Timestamp)] = {
-      map.asScala.toSeq
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/40da4d18/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 0517b0a..f705da3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1236,7 +1236,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
   }
 
   test("SeenFilesMap") {
-    val map = new SeenFilesMap(maxAgeMs = 10)
+    val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false)
 
     map.add("a", 5)
     assert(map.size == 1)
@@ -1269,8 +1269,26 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     assert(map.isNewFile("e", 20))
   }
 
+  test("SeenFilesMap with fileNameOnly = true") {
+    val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = true)
+
+    map.add("file:///a/b/c/d", 5)
+    map.add("file:///a/b/c/e", 5)
+    assert(map.size === 2)
+
+    assert(!map.isNewFile("d", 5))
+    assert(!map.isNewFile("file:///d", 5))
+    assert(!map.isNewFile("file:///x/d", 5))
+    assert(!map.isNewFile("file:///x/y/d", 5))
+
+    map.add("s3:///bucket/d", 5)
+    map.add("s3n:///bucket/d", 5)
+    map.add("s3a:///bucket/d", 5)
+    assert(map.size === 2)
+  }
+
   test("SeenFilesMap should only consider a file old if it is earlier than last purge time") {
-    val map = new SeenFilesMap(maxAgeMs = 10)
+    val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = false)
 
     map.add("a", 20)
     assert(map.size == 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org