You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/01/27 18:12:03 UTC

[spark] branch master updated: [SPARK-26716][SQL] FileFormat: the supported types of read/write should be consistent

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 36a2e63  [SPARK-26716][SQL] FileFormat: the supported types of read/write should be consistent
36a2e63 is described below

commit 36a2e6371b4d173c3e03cc0d869c39335a0d7682
Author: Gengliang Wang <ge...@databricks.com>
AuthorDate: Sun Jan 27 10:11:42 2019 -0800

    [SPARK-26716][SQL] FileFormat: the supported types of read/write should be consistent
    
    ## What changes were proposed in this pull request?
    
    1. Remove parameter `isReadPath`. The supported types of read/write should be the same.
    
    2. Disallow reading `NullType` for ORC data source. In #21667 and #21389, it was supposed that ORC supports reading `NullType`, but can't write it. This doesn't make sense. I read docs and did some tests. ORC doesn't support `NullType`.
    
    ## How was this patch tested?
    
    Unit tset
    
    Closes #23639 from gengliangwang/supportDataType.
    
    Authored-by: Gengliang Wang <ge...@databricks.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../execution/datasources/DataSourceUtils.scala    |   2 +-
 .../sql/execution/datasources/FileFormat.scala     |   2 +-
 .../execution/datasources/csv/CSVFileFormat.scala  |   4 +-
 .../datasources/json/JsonFileFormat.scala          |  10 +--
 .../execution/datasources/orc/OrcFileFormat.scala  |  12 ++-
 .../datasources/parquet/ParquetFileFormat.scala    |  10 +--
 .../datasources/text/TextFileFormat.scala          |   2 +-
 .../spark/sql/FileBasedDataSourceSuite.scala       | 100 ++++++++-------------
 .../apache/spark/sql/hive/orc/OrcFileFormat.scala  |  12 ++-
 9 files changed, 62 insertions(+), 92 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
index 90cec5e..a32a940 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala
@@ -45,7 +45,7 @@ object DataSourceUtils {
    */
   private def verifySchema(format: FileFormat, schema: StructType, isReadPath: Boolean): Unit = {
     schema.foreach { field =>
-      if (!format.supportDataType(field.dataType, isReadPath)) {
+      if (!format.supportDataType(field.dataType)) {
         throw new AnalysisException(
           s"$format data source does not support ${field.dataType.catalogString} data type.")
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 2c162e2..f0b4971 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -156,7 +156,7 @@ trait FileFormat {
    * Returns whether this format supports the given [[DataType]] in read/write path.
    * By default all data types are supported.
    */
-  def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = true
+  def supportDataType(dataType: DataType): Boolean = true
 }
 
 /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
index f4f139d..d08a54c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVFileFormat.scala
@@ -153,10 +153,10 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
 
   override def equals(other: Any): Boolean = other.isInstanceOf[CSVFileFormat]
 
-  override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
+  override def supportDataType(dataType: DataType): Boolean = dataType match {
     case _: AtomicType => true
 
-    case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)
+    case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
 
     case _ => false
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
index 40f55e7..d3f0414 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala
@@ -140,17 +140,17 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
 
   override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat]
 
-  override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
+  override def supportDataType(dataType: DataType): Boolean = dataType match {
     case _: AtomicType => true
 
-    case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) }
+    case st: StructType => st.forall { f => supportDataType(f.dataType) }
 
-    case ArrayType(elementType, _) => supportDataType(elementType, isReadPath)
+    case ArrayType(elementType, _) => supportDataType(elementType)
 
     case MapType(keyType, valueType, _) =>
-      supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath)
+      supportDataType(keyType) && supportDataType(valueType)
 
-    case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)
+    case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
 
     case _: NullType => true
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
index 14779cd..2a76495 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFileFormat.scala
@@ -235,19 +235,17 @@ class OrcFileFormat
     }
   }
 
-  override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
+  override def supportDataType(dataType: DataType): Boolean = dataType match {
     case _: AtomicType => true
 
-    case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) }
+    case st: StructType => st.forall { f => supportDataType(f.dataType) }
 
-    case ArrayType(elementType, _) => supportDataType(elementType, isReadPath)
+    case ArrayType(elementType, _) => supportDataType(elementType)
 
     case MapType(keyType, valueType, _) =>
-      supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath)
+      supportDataType(keyType) && supportDataType(valueType)
 
-    case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)
-
-    case _: NullType => isReadPath
+    case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
 
     case _ => false
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index f04502d..efa4f3f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -453,17 +453,17 @@ class ParquetFileFormat
     }
   }
 
-  override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
+  override def supportDataType(dataType: DataType): Boolean = dataType match {
     case _: AtomicType => true
 
-    case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) }
+    case st: StructType => st.forall { f => supportDataType(f.dataType) }
 
-    case ArrayType(elementType, _) => supportDataType(elementType, isReadPath)
+    case ArrayType(elementType, _) => supportDataType(elementType)
 
     case MapType(keyType, valueType, _) =>
-      supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath)
+      supportDataType(keyType) && supportDataType(valueType)
 
-    case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)
+    case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
 
     case _ => false
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
index 0607f7b..f8a24eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/TextFileFormat.scala
@@ -139,7 +139,7 @@ class TextFileFormat extends TextBasedFileFormat with DataSourceRegister {
     }
   }
 
-  override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean =
+  override def supportDataType(dataType: DataType): Boolean =
     dataType == StringType
 }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 54299e9..5e67050 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -367,69 +367,43 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSQLContext with Befo
   }
 
   test("SPARK-24204 error handling for unsupported Null data types - csv, parquet, orc") {
-    withTempDir { dir =>
-      val tempDir = new File(dir, "files").getCanonicalPath
-
-      Seq("orc").foreach { format =>
-        // write path
-        var msg = intercept[AnalysisException] {
-          sql("select null").write.format(format).mode("overwrite").save(tempDir)
-        }.getMessage
-        assert(msg.toLowerCase(Locale.ROOT)
-          .contains(s"$format data source does not support null data type."))
-
-        msg = intercept[AnalysisException] {
-          spark.udf.register("testType", () => new NullData())
-          sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
-        }.getMessage
-        assert(msg.toLowerCase(Locale.ROOT)
-          .contains(s"$format data source does not support null data type."))
-
-        // read path
-        // We expect the types below should be passed for backward-compatibility
-
-        // Null type
-        var schema = StructType(StructField("a", NullType, true) :: Nil)
-        spark.range(1).write.format(format).mode("overwrite").save(tempDir)
-        spark.read.schema(schema).format(format).load(tempDir).collect()
-
-        // UDT having null data
-        schema = StructType(StructField("a", new NullUDT(), true) :: Nil)
-        spark.range(1).write.format(format).mode("overwrite").save(tempDir)
-        spark.read.schema(schema).format(format).load(tempDir).collect()
-      }
-
-      Seq("parquet", "csv").foreach { format =>
-        // write path
-        var msg = intercept[AnalysisException] {
-          sql("select null").write.format(format).mode("overwrite").save(tempDir)
-        }.getMessage
-        assert(msg.toLowerCase(Locale.ROOT)
-          .contains(s"$format data source does not support null data type."))
-
-        msg = intercept[AnalysisException] {
-          spark.udf.register("testType", () => new NullData())
-          sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
-        }.getMessage
-        assert(msg.toLowerCase(Locale.ROOT)
-          .contains(s"$format data source does not support null data type."))
-
-        // read path
-        msg = intercept[AnalysisException] {
-          val schema = StructType(StructField("a", NullType, true) :: Nil)
-          spark.range(1).write.format(format).mode("overwrite").save(tempDir)
-          spark.read.schema(schema).format(format).load(tempDir).collect()
-        }.getMessage
-        assert(msg.toLowerCase(Locale.ROOT)
-          .contains(s"$format data source does not support null data type."))
-
-        msg = intercept[AnalysisException] {
-          val schema = StructType(StructField("a", new NullUDT(), true) :: Nil)
-          spark.range(1).write.format(format).mode("overwrite").save(tempDir)
-          spark.read.schema(schema).format(format).load(tempDir).collect()
-        }.getMessage
-        assert(msg.toLowerCase(Locale.ROOT)
-          .contains(s"$format data source does not support null data type."))
+    // TODO(SPARK-26744): support data type validating in V2 data source, and test V2 as well.
+    withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "orc") {
+      withTempDir { dir =>
+        val tempDir = new File(dir, "files").getCanonicalPath
+
+        Seq("parquet", "csv", "orc").foreach { format =>
+          // write path
+          var msg = intercept[AnalysisException] {
+            sql("select null").write.format(format).mode("overwrite").save(tempDir)
+          }.getMessage
+          assert(msg.toLowerCase(Locale.ROOT)
+            .contains(s"$format data source does not support null data type."))
+
+          msg = intercept[AnalysisException] {
+            spark.udf.register("testType", () => new NullData())
+            sql("select testType()").write.format(format).mode("overwrite").save(tempDir)
+          }.getMessage
+          assert(msg.toLowerCase(Locale.ROOT)
+            .contains(s"$format data source does not support null data type."))
+
+          // read path
+          msg = intercept[AnalysisException] {
+            val schema = StructType(StructField("a", NullType, true) :: Nil)
+            spark.range(1).write.format(format).mode("overwrite").save(tempDir)
+            spark.read.schema(schema).format(format).load(tempDir).collect()
+          }.getMessage
+          assert(msg.toLowerCase(Locale.ROOT)
+            .contains(s"$format data source does not support null data type."))
+
+          msg = intercept[AnalysisException] {
+            val schema = StructType(StructField("a", new NullUDT(), true) :: Nil)
+            spark.range(1).write.format(format).mode("overwrite").save(tempDir)
+            spark.read.schema(schema).format(format).load(tempDir).collect()
+          }.getMessage
+          assert(msg.toLowerCase(Locale.ROOT)
+            .contains(s"$format data source does not support null data type."))
+        }
       }
     }
   }
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
index 4e641e3..bfb0a95 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileFormat.scala
@@ -181,19 +181,17 @@ class OrcFileFormat extends FileFormat with DataSourceRegister with Serializable
     }
   }
 
-  override def supportDataType(dataType: DataType, isReadPath: Boolean): Boolean = dataType match {
+  override def supportDataType(dataType: DataType): Boolean = dataType match {
     case _: AtomicType => true
 
-    case st: StructType => st.forall { f => supportDataType(f.dataType, isReadPath) }
+    case st: StructType => st.forall { f => supportDataType(f.dataType) }
 
-    case ArrayType(elementType, _) => supportDataType(elementType, isReadPath)
+    case ArrayType(elementType, _) => supportDataType(elementType)
 
     case MapType(keyType, valueType, _) =>
-      supportDataType(keyType, isReadPath) && supportDataType(valueType, isReadPath)
+      supportDataType(keyType) && supportDataType(valueType)
 
-    case udt: UserDefinedType[_] => supportDataType(udt.sqlType, isReadPath)
-
-    case _: NullType => isReadPath
+    case udt: UserDefinedType[_] => supportDataType(udt.sqlType)
 
     case _ => false
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org