You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/09/26 20:07:17 UTC

spark git commit: [SPARK-17153][SQL] Should read partition data when reading new files in filestream without globbing

Repository: spark
Updated Branches:
  refs/heads/master bde85f8b7 -> 8135e0e5e


[SPARK-17153][SQL] Should read partition data when reading new files in filestream without globbing

## What changes were proposed in this pull request?

When reading file stream with non-globbing path, the results return data with all `null`s for the
partitioned columns. E.g.,

    case class A(id: Int, value: Int)
    val data = spark.createDataset(Seq(
      A(1, 1),
      A(2, 2),
      A(2, 3))
    )
    val url = "/tmp/test"
    data.write.partitionBy("id").parquet(url)
    spark.read.parquet(url).show

    +-----+---+
    |value| id|
    +-----+---+
    |    2|  2|
    |    3|  2|
    |    1|  1|
    +-----+---+

    val s = spark.readStream.schema(spark.read.load(url).schema).parquet(url)
    s.writeStream.queryName("test").format("memory").start()

    sql("SELECT * FROM test").show

    +-----+----+
    |value|  id|
    +-----+----+
    |    2|null|
    |    3|null|
    |    1|null|
    +-----+----+

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <si...@tw.ibm.com>
Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #14803 from viirya/filestreamsource-option.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8135e0e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8135e0e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8135e0e5

Branch: refs/heads/master
Commit: 8135e0e5ebdb9c7f5ac41c675dc8979a5127a31a
Parents: bde85f8
Author: Liang-Chi Hsieh <si...@tw.ibm.com>
Authored: Mon Sep 26 13:07:11 2016 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Sep 26 13:07:11 2016 -0700

----------------------------------------------------------------------
 docs/structured-streaming-programming-guide.md  |  6 ++
 .../sql/execution/datasources/DataSource.scala  |  7 +-
 .../execution/streaming/FileStreamSource.scala  |  9 ++-
 .../sql/streaming/FileStreamSourceSuite.scala   | 83 +++++++++++++++++++-
 .../apache/spark/sql/streaming/StreamTest.scala |  8 ++
 5 files changed, 110 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8135e0e5/docs/structured-streaming-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index c7ed3b0..2e6df94 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -512,6 +512,12 @@ csvDF = spark \
 
 These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the [SQL Programming Guide](sql-programming-guide.html) for more details. Additionally, more details on the supported streaming sources are discussed later in the document.
 
+### Schema inference and partition of streaming DataFrames/Datasets
+
+By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting `spark.sql.streaming.schemaInference` to `true`.
+
+Partition discovery does occur when subdirectories that are named `/key=value/` are present and listing will automatically recurse into these directories. If these columns appear in the user provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add `/data/year=2016/` when `/data/year=2015/` was present, but it is invalid to change the partitioning column (i.e. by creating the directory `/data/date=2016-04-17/`).
+
 ## Operations on streaming DataFrames/Datasets
 You can apply all kinds of operations on streaming DataFrames/Datasets \u2013 ranging from untyped, SQL-like operations (e.g. `select`, `where`, `groupBy`), to typed RDD-like operations (e.g. `map`, `filter`, `flatMap`). See the [SQL programming guide](sql-programming-guide.html) for more details. Let\u2019s take a look at a few example operations that you can use.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8135e0e5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 3206701..e75e7d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -197,10 +197,15 @@ case class DataSource(
         SparkHadoopUtil.get.globPathIfNecessary(qualified)
       }.toArray
       val fileCatalog = new ListingFileCatalog(sparkSession, globbedPaths, options, None)
-      format.inferSchema(
+      val partitionCols = fileCatalog.partitionSpec().partitionColumns.fields
+      val inferred = format.inferSchema(
         sparkSession,
         caseInsensitiveOptions,
         fileCatalog.allFiles())
+
+      inferred.map { inferredSchema =>
+        StructType(inferredSchema ++ partitionCols)
+      }
     }.getOrElse {
       throw new AnalysisException("Unable to infer schema. It must be specified manually.")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8135e0e5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index be02327..614a626 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -47,6 +47,13 @@ class FileStreamSource(
     fs.makeQualified(new Path(path))  // can contains glob patterns
   }
 
+  private val optionsWithPartitionBasePath = sourceOptions.optionMapWithoutPath ++ {
+    if (!SparkHadoopUtil.get.isGlobPath(new Path(path)) && options.contains("path")) {
+      Map("basePath" -> path)
+    } else {
+      Map()
+    }}
+
   private val metadataLog =
     new FileStreamSourceLog(FileStreamSourceLog.VERSION, sparkSession, metadataPath)
   private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)
@@ -136,7 +143,7 @@ class FileStreamSource(
         paths = files.map(_.path),
         userSpecifiedSchema = Some(schema),
         className = fileFormatClassName,
-        options = sourceOptions.optionMapWithoutPath)
+        options = optionsWithPartitionBasePath)
     Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation(
       checkFilesExist = false)))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8135e0e5/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 55c95ae..3157afe 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -102,6 +102,12 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext with Private
     }
   }
 
+  case class DeleteFile(file: File) extends ExternalAction {
+    def runAction(): Unit = {
+      Utils.deleteRecursively(file)
+    }
+  }
+
   /** Use `format` and `path` to create FileStreamSource via DataFrameReader */
   def createFileStream(
       format: String,
@@ -608,6 +614,81 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
 
   // =============== other tests ================
 
+  test("read new files in partitioned table without globbing, should read partition data") {
+    withTempDirs { case (dir, tmp) =>
+      val partitionFooSubDir = new File(dir, "partition=foo")
+      val partitionBarSubDir = new File(dir, "partition=bar")
+
+      val schema = new StructType().add("value", StringType).add("partition", StringType)
+      val fileStream = createFileStream("json", s"${dir.getCanonicalPath}", Some(schema))
+      val filtered = fileStream.filter($"value" contains "keep")
+      testStream(filtered)(
+        // Create new partition=foo sub dir and write to it
+        AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp),
+        CheckAnswer(("keep2", "foo")),
+
+        // Append to same partition=foo sub dir
+        AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
+        CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
+
+        // Create new partition sub dir and write to it
+        AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
+        CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
+
+        // Append to same partition=bar sub dir
+        AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
+        CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar"))
+      )
+    }
+  }
+
+  test("when schema inference is turned on, should read partition data") {
+    def createFile(content: String, src: File, tmp: File): Unit = {
+      val tempFile = Utils.tempFileWith(new File(tmp, "text"))
+      val finalFile = new File(src, tempFile.getName)
+      src.mkdirs()
+      require(stringToFile(tempFile, content).renameTo(finalFile))
+    }
+
+    withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
+      withTempDirs { case (dir, tmp) =>
+        val partitionFooSubDir = new File(dir, "partition=foo")
+        val partitionBarSubDir = new File(dir, "partition=bar")
+
+        // Create file in partition, so we can infer the schema.
+        createFile("{'value': 'drop0'}", partitionFooSubDir, tmp)
+
+        val fileStream = createFileStream("json", s"${dir.getCanonicalPath}")
+        val filtered = fileStream.filter($"value" contains "keep")
+        testStream(filtered)(
+          // Append to same partition=foo sub dir
+          AddTextFileData("{'value': 'drop1'}\n{'value': 'keep2'}", partitionFooSubDir, tmp),
+          CheckAnswer(("keep2", "foo")),
+
+          // Append to same partition=foo sub dir
+          AddTextFileData("{'value': 'keep3'}", partitionFooSubDir, tmp),
+          CheckAnswer(("keep2", "foo"), ("keep3", "foo")),
+
+          // Create new partition sub dir and write to it
+          AddTextFileData("{'value': 'keep4'}", partitionBarSubDir, tmp),
+          CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar")),
+
+          // Append to same partition=bar sub dir
+          AddTextFileData("{'value': 'keep5'}", partitionBarSubDir, tmp),
+          CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar")),
+
+          // Delete the two partition dirs
+          DeleteFile(partitionFooSubDir),
+          DeleteFile(partitionBarSubDir),
+
+          AddTextFileData("{'value': 'keep6'}", partitionBarSubDir, tmp),
+          CheckAnswer(("keep2", "foo"), ("keep3", "foo"), ("keep4", "bar"), ("keep5", "bar"),
+            ("keep6", "bar"))
+        )
+      }
+    }
+  }
+
   test("fault tolerance") {
     withTempDirs { case (src, tmp) =>
       val fileStream = createFileStream("text", src.getCanonicalPath)
@@ -792,7 +873,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       }
       assert(src.listFiles().size === numFiles)
 
-      val files = spark.readStream.text(root.getCanonicalPath).as[String]
+      val files = spark.readStream.text(root.getCanonicalPath).as[(String, Int)]
 
       // Note this query will use constant folding to eliminate the file scan.
       // This is to avoid actually running a Spark job with 10000 tasks

http://git-wip-us.apache.org/repos/asf/spark/blob/8135e0e5/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 6c5b170..aa6515b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -95,6 +95,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
     def addData(query: Option[StreamExecution]): (Source, Offset)
   }
 
+  /** A trait that can be extended when testing a source. */
+  trait ExternalAction extends StreamAction {
+    def runAction(): Unit
+  }
+
   case class AddDataMemory[A](source: MemoryStream[A], data: Seq[A]) extends AddData {
     override def toString: String = s"AddData to $source: ${data.mkString(",")}"
 
@@ -429,6 +434,9 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
                 failTest("Error adding data", e)
             }
 
+          case e: ExternalAction =>
+            e.runAction()
+
           case CheckAnswerRows(expectedAnswer, lastOnly, isSorted) =>
             verify(currentStream != null, "stream not running")
             // Get the map of source index to the current source objects


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org