You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/10/07 18:16:25 UTC
spark git commit: [SPARK-16411][SQL][STREAMING] Add textFile to
Structured Streaming.
Repository: spark
Updated Branches:
refs/heads/master aa3a6841e -> bb1aaf28e
[SPARK-16411][SQL][STREAMING] Add textFile to Structured Streaming.
## What changes were proposed in this pull request?
Adds the textFile API which exists in DataFrameReader and serves same purpose.
## How was this patch tested?
Added corresponding testcase.
Author: Prashant Sharma <pr...@in.ibm.com>
Closes #14087 from ScrapCodes/textFile.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb1aaf28
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb1aaf28
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb1aaf28
Branch: refs/heads/master
Commit: bb1aaf28eca6d9ae9af664ac3ad35cafdfc01a3b
Parents: aa3a684
Author: Prashant Sharma <pr...@in.ibm.com>
Authored: Fri Oct 7 11:16:24 2016 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Oct 7 11:16:24 2016 -0700
----------------------------------------------------------------------
.../spark/sql/streaming/DataStreamReader.scala | 33 +++++++++++++++++++-
.../sql/streaming/FileStreamSourceSuite.scala | 18 +++++++++++
2 files changed, 50 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/bb1aaf28/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 864a9cd..87b7306 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types.StructType
@@ -283,6 +283,37 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*/
def text(path: String): DataFrame = format("text").load(path)
+ /**
+ * Loads text file(s) and returns a [[Dataset]] of String. The underlying schema of the Dataset
+ * contains a single string column named "value".
+ *
+ * If the directory structure of the text files contains partitioning information, those are
+ * ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
+ *
+ * Each line in the text file is a new element in the resulting Dataset. For example:
+ * {{{
+ * // Scala:
+ * spark.readStream.textFile("/path/to/spark/README.md")
+ *
+ * // Java:
+ * spark.readStream().textFile("/path/to/spark/README.md")
+ * }}}
+ *
+ * You can set the following text-specific options to deal with text files:
+ * <ul>
+ * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
+ * considered in every trigger.</li>
+ * </ul>
+ *
+ * @param path input path
+ * @since 2.1.0
+ */
+ def textFile(path: String): Dataset[String] = {
+ if (userSpecifiedSchema.nonEmpty) {
+ throw new AnalysisException("User specified schema not supported with `textFile`")
+ }
+ text(path).select("value").as[String](sparkSession.implicits.newStringEncoder)
+ }
///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
http://git-wip-us.apache.org/repos/asf/spark/blob/bb1aaf28/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 3157afe..7f9c981 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -342,6 +342,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
+ test("read from textfile") {
+ withTempDirs { case (src, tmp) =>
+ val textStream = spark.readStream.textFile(src.getCanonicalPath)
+ val filtered = textStream.filter(_.contains("keep"))
+
+ testStream(filtered)(
+ AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
+ CheckAnswer("keep2", "keep3"),
+ StopStream,
+ AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
+ StartStream(),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+ AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+ )
+ }
+ }
+
test("SPARK-17165 should not track the list of seen files indefinitely") {
// This test works by:
// 1. Create a file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org