You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/10/07 18:16:25 UTC

spark git commit: [SPARK-16411][SQL][STREAMING] Add textFile to Structured Streaming.

Repository: spark
Updated Branches:
  refs/heads/master aa3a6841e -> bb1aaf28e


[SPARK-16411][SQL][STREAMING] Add textFile to Structured Streaming.

## What changes were proposed in this pull request?

Adds the textFile API which exists in DataFrameReader and serves same purpose.

## How was this patch tested?

Added corresponding testcase.

Author: Prashant Sharma <pr...@in.ibm.com>

Closes #14087 from ScrapCodes/textFile.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb1aaf28
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb1aaf28
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb1aaf28

Branch: refs/heads/master
Commit: bb1aaf28eca6d9ae9af664ac3ad35cafdfc01a3b
Parents: aa3a684
Author: Prashant Sharma <pr...@in.ibm.com>
Authored: Fri Oct 7 11:16:24 2016 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Oct 7 11:16:24 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/streaming/DataStreamReader.scala  | 33 +++++++++++++++++++-
 .../sql/streaming/FileStreamSourceSuite.scala   | 18 +++++++++++
 2 files changed, 50 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb1aaf28/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 864a9cd..87b7306 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.StreamingRelation
 import org.apache.spark.sql.types.StructType
@@ -283,6 +283,37 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    */
   def text(path: String): DataFrame = format("text").load(path)
 
+  /**
+   * Loads text file(s) and returns a [[Dataset]] of String. The underlying schema of the Dataset
+   * contains a single string column named "value".
+   *
+   * If the directory structure of the text files contains partitioning information, those are
+   * ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
+   *
+   * Each line in the text file is a new element in the resulting Dataset. For example:
+   * {{{
+   *   // Scala:
+   *   spark.readStream.textFile("/path/to/spark/README.md")
+   *
+   *   // Java:
+   *   spark.readStream().textFile("/path/to/spark/README.md")
+   * }}}
+   *
+   * You can set the following text-specific options to deal with text files:
+   * <ul>
+   * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
+   * considered in every trigger.</li>
+   * </ul>
+   *
+   * @param path input path
+   * @since 2.1.0
+   */
+  def textFile(path: String): Dataset[String] = {
+    if (userSpecifiedSchema.nonEmpty) {
+      throw new AnalysisException("User specified schema not supported with `textFile`")
+    }
+    text(path).select("value").as[String](sparkSession.implicits.newStringEncoder)
+  }
 
   ///////////////////////////////////////////////////////////////////////////////////////
   // Builder pattern config options

http://git-wip-us.apache.org/repos/asf/spark/blob/bb1aaf28/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 3157afe..7f9c981 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -342,6 +342,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
     }
   }
 
+  test("read from textfile") {
+    withTempDirs { case (src, tmp) =>
+      val textStream = spark.readStream.textFile(src.getCanonicalPath)
+      val filtered = textStream.filter(_.contains("keep"))
+
+      testStream(filtered)(
+        AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
+        CheckAnswer("keep2", "keep3"),
+        StopStream,
+        AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
+        StartStream(),
+        CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+        AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
+        CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+      )
+    }
+  }
+
   test("SPARK-17165 should not track the list of seen files indefinitely") {
     // This test works by:
     // 1. Create a file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org