You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2017/07/19 22:56:30 UTC

spark git commit: [SPARK-21463] Allow userSpecifiedSchema to override partition inference performed by MetadataLogFileIndex

Repository: spark
Updated Branches:
  refs/heads/master 8cd9cdf17 -> 2c9d5ef1f


[SPARK-21463] Allow userSpecifiedSchema to override partition inference performed by MetadataLogFileIndex

## What changes were proposed in this pull request?

When using the MetadataLogFileIndex to read back a table, we don't respect the user provided schema as the proper column types. This can lead to issues when trying to read strings that look like dates that get truncated to DateType, or longs being truncated to IntegerType, just because a long value doesn't exist.

## How was this patch tested?

Unit tests and manual tests

Author: Burak Yavuz <br...@gmail.com>

Closes #18676 from brkyvz/stream-partitioning.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c9d5ef1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c9d5ef1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c9d5ef1

Branch: refs/heads/master
Commit: 2c9d5ef1f0c30713dafbf8ef0eb69d5520f7dcaf
Parents: 8cd9cdf
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Jul 19 15:56:26 2017 -0700
Committer: Burak Yavuz <br...@gmail.com>
Committed: Wed Jul 19 15:56:26 2017 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  | 33 +++++++++++++++-----
 .../execution/streaming/FileStreamSource.scala  |  2 +-
 .../streaming/MetadataLogFileIndex.scala        | 11 +++++--
 .../ParquetPartitionDiscoverySuite.scala        | 33 ++++++++++++++++++++
 4 files changed, 69 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c9d5ef1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d36a04f..cbe8ce4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -97,6 +97,24 @@ case class DataSource(
   }
 
   /**
+   * In the read path, only managed tables by Hive provide the partition columns properly when
+   * initializing this class. All other file based data sources will try to infer the partitioning,
+   * and then cast the inferred types to user specified dataTypes if the partition columns exist
+   * inside `userSpecifiedSchema`, otherwise we can hit data corruption bugs like SPARK-18510, or
+   * inconsistent data types as reported in SPARK-21463.
+   * @param fileIndex A FileIndex that will perform partition inference
+   * @return The PartitionSchema resolved from inference and cast according to `userSpecifiedSchema`
+   */
+  private def combineInferredAndUserSpecifiedPartitionSchema(fileIndex: FileIndex): StructType = {
+    val resolved = fileIndex.partitionSchema.map { partitionField =>
+      // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
+      userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
+        partitionField)
+    }
+    StructType(resolved)
+  }
+
+  /**
    * Get the schema of the given FileFormat, if provided by `userSpecifiedSchema`, or try to infer
    * it. In the read path, only managed tables by Hive provide the partition columns properly when
    * initializing this class. All other file based data sources will try to infer the partitioning,
@@ -139,12 +157,7 @@ case class DataSource(
     val partitionSchema = if (partitionColumns.isEmpty) {
       // Try to infer partitioning, because no DataSource in the read path provides the partitioning
       // columns properly unless it is a Hive DataSource
-      val resolved = tempFileIndex.partitionSchema.map { partitionField =>
-        // SPARK-18510: try to get schema from userSpecifiedSchema, otherwise fallback to inferred
-        userSpecifiedSchema.flatMap(_.find(f => equality(f.name, partitionField.name))).getOrElse(
-          partitionField)
-      }
-      StructType(resolved)
+      combineInferredAndUserSpecifiedPartitionSchema(tempFileIndex)
     } else {
       // maintain old behavior before SPARK-18510. If userSpecifiedSchema is empty used inferred
       // partitioning
@@ -336,7 +349,13 @@ case class DataSource(
             caseInsensitiveOptions.get("path").toSeq ++ paths,
             sparkSession.sessionState.newHadoopConf()) =>
         val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
-        val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath)
+        val tempFileCatalog = new MetadataLogFileIndex(sparkSession, basePath, None)
+        val fileCatalog = if (userSpecifiedSchema.nonEmpty) {
+          val partitionSchema = combineInferredAndUserSpecifiedPartitionSchema(tempFileCatalog)
+          new MetadataLogFileIndex(sparkSession, basePath, Option(partitionSchema))
+        } else {
+          tempFileCatalog
+        }
         val dataSchema = userSpecifiedSchema.orElse {
           format.inferSchema(
             sparkSession,

http://git-wip-us.apache.org/repos/asf/spark/blob/2c9d5ef1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index a9e64c6..4b1b252 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -195,7 +195,7 @@ class FileStreamSource(
   private def allFilesUsingMetadataLogFileIndex() = {
     // Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a
     // non-glob path
-    new MetadataLogFileIndex(sparkSession, qualifiedBasePath).allFiles()
+    new MetadataLogFileIndex(sparkSession, qualifiedBasePath, None).allFiles()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2c9d5ef1/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
index aeaa134..1da703c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
@@ -23,14 +23,21 @@ import org.apache.hadoop.fs.{FileStatus, Path}
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.types.StructType
 
 
 /**
  * A [[FileIndex]] that generates the list of files to processing by reading them from the
  * metadata log files generated by the [[FileStreamSink]].
+ *
+ * @param userPartitionSchema an optional partition schema that will be use to provide types for
+ *                            the discovered partitions
  */
-class MetadataLogFileIndex(sparkSession: SparkSession, path: Path)
-  extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) {
+class MetadataLogFileIndex(
+    sparkSession: SparkSession,
+    path: Path,
+    userPartitionSchema: Option[StructType])
+  extends PartitioningAwareFileIndex(sparkSession, Map.empty, userPartitionSchema) {
 
   private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
   logInfo(s"Reading streaming file log from $metadataDirectory")

http://git-wip-us.apache.org/repos/asf/spark/blob/2c9d5ef1/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 84b34d5..2f5fd84 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition}
+import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -1022,4 +1023,36 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
       }
     }
   }
+
+  test("SPARK-21463: MetadataLogFileIndex should respect userSpecifiedSchema for partition cols") {
+    withTempDir { tempDir =>
+      val output = new File(tempDir, "output").toString
+      val checkpoint = new File(tempDir, "chkpoint").toString
+      try {
+        val stream = MemoryStream[(String, Int)]
+        val df = stream.toDS().toDF("time", "value")
+        val sq = df.writeStream
+          .option("checkpointLocation", checkpoint)
+          .format("parquet")
+          .partitionBy("time")
+          .start(output)
+
+        stream.addData(("2017-01-01-00", 1), ("2017-01-01-01", 2))
+        sq.processAllAvailable()
+
+        val schema = new StructType()
+          .add("time", StringType)
+          .add("value", IntegerType)
+        val readBack = spark.read.schema(schema).parquet(output)
+        assert(readBack.schema.toSet === schema.toSet)
+
+        checkAnswer(
+          readBack,
+          Seq(Row("2017-01-01-00", 1), Row("2017-01-01-01", 2))
+        )
+      } finally {
+        spark.streams.active.foreach(_.stop())
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org