You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/04/23 02:17:42 UTC

spark git commit: [SPARK-14832][SQL][STREAMING] Refactor DataSource to ensure schema is inferred only once when creating a file stream

Repository: spark
Updated Branches:
  refs/heads/master c25b97fcc -> c431a76d0


[SPARK-14832][SQL][STREAMING] Refactor DataSource to ensure schema is inferred only once when creating a file stream

## What changes were proposed in this pull request?

When creating a file stream using sqlContext.write.stream(), existing files are scanned twice for finding the schema
- Once, when creating a DataSource + StreamingRelation in the DataFrameReader.stream()
- Again, when creating streaming Source from the DataSource, in DataSource.createSource()

Instead, the schema should be generated only once, at the time of creating the dataframe, and when the streaming source is created, it should just reuse that schema

The solution proposed in this PR is to add a lazy field in DataSource that caches the schema. Then streaming Source created by the DataSource can just reuse the schema.

## How was this patch tested?
Refactored unit tests.

Author: Tathagata Das <ta...@gmail.com>

Closes #12591 from tdas/SPARK-14832.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c431a76d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c431a76d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c431a76d

Branch: refs/heads/master
Commit: c431a76d0628985bb445189b9a2913dd41b86f7b
Parents: c25b97f
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Apr 22 17:17:37 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Apr 22 17:17:37 2016 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  |  35 ++++---
 .../execution/streaming/FileStreamSource.scala  |  22 +---
 .../execution/streaming/StreamingRelation.scala |   4 +-
 .../sql/streaming/FileStreamSourceSuite.scala   | 104 +++++++++++++++++--
 4 files changed, 116 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c431a76d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 0dfe7db..07bc8ae 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -67,7 +67,10 @@ case class DataSource(
     bucketSpec: Option[BucketSpec] = None,
     options: Map[String, String] = Map.empty) extends Logging {
 
+  case class SourceInfo(name: String, schema: StructType)
+
   lazy val providingClass: Class[_] = lookupDataSource(className)
+  lazy val sourceInfo = sourceSchema()
 
   /** A map to maintain backward compatibility in case we move data sources around. */
   private val backwardCompatibilityMap = Map(
@@ -145,17 +148,19 @@ case class DataSource(
   }
 
   /** Returns the name and schema of the source that can be used to continually read data. */
-  def sourceSchema(): (String, StructType) = {
+  private def sourceSchema(): SourceInfo = {
     providingClass.newInstance() match {
       case s: StreamSourceProvider =>
-        s.sourceSchema(sqlContext, userSpecifiedSchema, className, options)
+        val (name, schema) = s.sourceSchema(sqlContext, userSpecifiedSchema, className, options)
+        SourceInfo(name, schema)
 
       case format: FileFormat =>
         val caseInsensitiveOptions = new CaseInsensitiveMap(options)
         val path = caseInsensitiveOptions.getOrElse("path", {
           throw new IllegalArgumentException("'path' is not specified")
         })
-        (s"FileSource[$path]", inferFileFormatSchema(format))
+        SourceInfo(s"FileSource[$path]", inferFileFormatSchema(format))
+
       case _ =>
         throw new UnsupportedOperationException(
           s"Data source $className does not support streamed reading")
@@ -174,24 +179,20 @@ case class DataSource(
           throw new IllegalArgumentException("'path' is not specified")
         })
 
-        val dataSchema = inferFileFormatSchema(format)
-
         def dataFrameBuilder(files: Array[String]): DataFrame = {
-          Dataset.ofRows(
-            sqlContext,
-            LogicalRelation(
-              DataSource(
-                sqlContext,
-                paths = files,
-                userSpecifiedSchema = Some(dataSchema),
-                className = className,
-                options =
-                  new CaseInsensitiveMap(
-                    options.filterKeys(_ != "path") + ("basePath" -> path))).resolveRelation()))
+          val newOptions = options.filterKeys(_ != "path") + ("basePath" -> path)
+          val newDataSource =
+            DataSource(
+              sqlContext,
+              paths = files,
+              userSpecifiedSchema = Some(sourceInfo.schema),
+              className = className,
+              options = new CaseInsensitiveMap(newOptions))
+          Dataset.ofRows(sqlContext, LogicalRelation(newDataSource.resolveRelation()))
         }
 
         new FileStreamSource(
-          sqlContext, metadataPath, path, Some(dataSchema), className, dataFrameBuilder)
+          sqlContext, metadataPath, path, sourceInfo.schema, dataFrameBuilder)
       case _ =>
         throw new UnsupportedOperationException(
           s"Data source $className does not support streamed reading")

http://git-wip-us.apache.org/repos/asf/spark/blob/c431a76d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 6448cb6..51c3aee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -35,8 +35,7 @@ class FileStreamSource(
     sqlContext: SQLContext,
     metadataPath: String,
     path: String,
-    dataSchema: Option[StructType],
-    providerName: String,
+    override val schema: StructType,
     dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {
 
   private val fs = new Path(path).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
@@ -48,24 +47,6 @@ class FileStreamSource(
     files.foreach(seenFiles.add)
   }
 
-  /** Returns the schema of the data from this source */
-  override lazy val schema: StructType = {
-    dataSchema.getOrElse {
-      val filesPresent = fetchAllFiles()
-      if (filesPresent.isEmpty) {
-        if (providerName == "text") {
-          // Add a default schema for "text"
-          new StructType().add("value", StringType)
-        } else {
-          throw new IllegalArgumentException("No schema specified")
-        }
-      } else {
-        // There are some existing files. Use them to infer the schema.
-        dataFrameBuilder(filesPresent.toArray).schema
-      }
-    }
-  }
-
   /**
    * Returns the maximum offset that can be retrieved from the source.
    *
@@ -118,7 +99,6 @@ class FileStreamSource(
     logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
     logDebug(s"Streaming ${files.mkString(", ")}")
     dataFrameBuilder(files)
-
   }
 
   private def fetchAllFiles(): Seq[String] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/c431a76d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index c29291e..3341580 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql.execution.datasources.DataSource
 
 object StreamingRelation {
   def apply(dataSource: DataSource): StreamingRelation = {
-    val (name, schema) = dataSource.sourceSchema()
-    StreamingRelation(dataSource, name, schema.toAttributes)
+    StreamingRelation(
+      dataSource, dataSource.sourceInfo.name, dataSource.sourceInfo.schema.toAttributes)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c431a76d/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 64cddf0..45dca2f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -19,11 +19,11 @@ package org.apache.spark.sql.streaming
 
 import java.io.File
 
-import org.apache.spark.sql.{AnalysisException, StreamTest}
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{StringType, StructType}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.util.Utils
 
 class FileStreamSourceTest extends StreamTest with SharedSQLContext {
@@ -44,20 +44,32 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
 
   case class AddParquetFileData(
       source: FileStreamSource,
-      content: Seq[String],
+      df: DataFrame,
       src: File,
       tmp: File) extends AddData {
 
     override def addData(): Offset = {
       source.withBatchingLocked {
-        val file = Utils.tempFileWith(new File(tmp, "parquet"))
-        content.toDS().toDF().write.parquet(file.getCanonicalPath)
-        file.renameTo(new File(src, file.getName))
+        AddParquetFileData.writeToFile(df, src, tmp)
         source.currentOffset
       } + 1
     }
   }
 
+  object AddParquetFileData {
+    def apply(
+      source: FileStreamSource,
+      seq: Seq[String],
+      src: File,
+      tmp: File): AddParquetFileData = new AddParquetFileData(source, seq.toDS().toDF(), src, tmp)
+
+    def writeToFile(df: DataFrame, src: File, tmp: File): Unit = {
+      val file = Utils.tempFileWith(new File(tmp, "parquet"))
+      df.write.parquet(file.getCanonicalPath)
+      file.renameTo(new File(src, file.getName))
+    }
+  }
+
   /** Use `format` and `path` to create FileStreamSource via DataFrameReader */
   def createFileStreamSource(
       format: String,
@@ -78,6 +90,17 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
       }.head
   }
 
+  def withTempDirs(body: (File, File) => Unit) {
+    val src = Utils.createTempDir(namePrefix = "streaming.src")
+    val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
+    try {
+      body(src, tmp)
+    } finally {
+      Utils.deleteRecursively(src)
+      Utils.deleteRecursively(tmp)
+    }
+  }
+
   val valueSchema = new StructType().add("value", StringType)
 }
 
@@ -99,9 +122,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
         reader.stream()
       }
     df.queryExecution.analyzed
-      .collect { case StreamingRelation(dataSource, _, _) =>
-        dataSource.sourceSchema()
-      }.head._2
+      .collect { case s @ StreamingRelation(dataSource, _, _) => s.schema }.head
   }
 
   test("FileStreamSource schema: no path") {
@@ -305,6 +326,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
     )
   }
 
+
+  test("reading from json files with changing schema") {
+    withTempDirs { case (src, tmp) =>
+
+      // Add a file so that we can infer its schema
+      stringToFile(new File(src, "existing"), "{'k': 'value0'}")
+
+      val textSource = createFileStreamSource("json", src.getCanonicalPath)
+
+      // FileStreamSource should infer the column "k"
+      val text = textSource.toDF()
+      assert(text.schema === StructType(Seq(StructField("k", StringType))))
+
+      // After creating DF and before starting stream, add data with different schema
+      // Should not affect the inferred schema any more
+      stringToFile(new File(src, "existing2"), "{'k': 'value1', 'v': 'new'}")
+
+      testStream(text)(
+
+        // Should not pick up column v in the file added before start
+        AddTextFileData(textSource, "{'k': 'value2'}", src, tmp),
+        CheckAnswer("value0", "value1", "value2"),
+
+        // Should read data in column k, and ignore v
+        AddTextFileData(textSource, "{'k': 'value3', 'v': 'new'}", src, tmp),
+        CheckAnswer("value0", "value1", "value2", "value3"),
+
+        // Should ignore rows that do not have the necessary k column
+        AddTextFileData(textSource, "{'v': 'value4'}", src, tmp),
+        CheckAnswer("value0", "value1", "value2", "value3", null))
+    }
+  }
+
   test("read from parquet files") {
     val src = Utils.createTempDir(namePrefix = "streaming.src")
     val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
@@ -327,6 +381,38 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
     Utils.deleteRecursively(tmp)
   }
 
+  test("read from parquet files with changing schema") {
+
+    withTempDirs { case (src, tmp) =>
+      // Add a file so that we can infer its schema
+      AddParquetFileData.writeToFile(Seq("value0").toDF("k"), src, tmp)
+
+      val fileSource = createFileStreamSource("parquet", src.getCanonicalPath)
+      val parquetData = fileSource.toDF()
+
+      // FileStreamSource should infer the column "k"
+      assert(parquetData.schema === StructType(Seq(StructField("k", StringType))))
+
+      // After creating DF and before starting stream, add data with different schema
+      // Should not affect the inferred schema any more
+      AddParquetFileData.writeToFile(Seq(("value1", 0)).toDF("k", "v"), src, tmp)
+
+      testStream(parquetData)(
+        // Should not pick up column v in the file added before start
+        AddParquetFileData(fileSource, Seq("value2").toDF("k"), src, tmp),
+        CheckAnswer("value0", "value1", "value2"),
+
+        // Should read data in column k, and ignore v
+        AddParquetFileData(fileSource, Seq(("value3", 1)).toDF("k", "v"), src, tmp),
+        CheckAnswer("value0", "value1", "value2", "value3"),
+
+        // Should ignore rows that do not have the necessary k column
+        AddParquetFileData(fileSource, Seq("value5").toDF("v"), src, tmp),
+        CheckAnswer("value0", "value1", "value2", "value3", null)
+      )
+    }
+  }
+
   test("file stream source without schema") {
     val src = Utils.createTempDir(namePrefix = "streaming.src")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org