You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/04/04 18:24:20 UTC

spark git commit: [SPARK-6607][SQL] Check invalid characters for Parquet schema and show error messages

Repository: spark
Updated Branches:
  refs/heads/master da25c86d6 -> 7bca62f79


[SPARK-6607][SQL] Check invalid characters for Parquet schema and show error messages

'(' and ')' are special characters used in Parquet schema for type annotation. When we run an aggregation query, we will obtain attribute name such as "MAX(a)".

If we directly store the generated DataFrame as Parquet file, it causes failure when reading and parsing the stored schema string.

Several methods can be adopted to solve this. This pr uses a simplest one to just replace attribute names before generating Parquet schema based on these attributes.

Another possible method might be modifying all aggregation expression names from "func(column)" to "func[column]".

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #5263 from viirya/parquet_aggregation_name and squashes the following commits:

2d70542 [Liang-Chi Hsieh] Address comment.
463dff4 [Liang-Chi Hsieh] Instead of replacing special chars, showing error message to user to suggest using Alias.
1de001d [Liang-Chi Hsieh] Replace special characters '(' and ')' of Parquet schema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7bca62f7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7bca62f7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7bca62f7

Branch: refs/heads/master
Commit: 7bca62f79056e592cf07b49d8b8d04c59dea25fc
Parents: da25c86
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Sun Apr 5 00:20:43 2015 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Sun Apr 5 00:20:43 2015 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/parquet/ParquetTypes.scala | 14 ++++++++++++++
 .../org/apache/spark/sql/hive/parquetSuites.scala   | 16 ++++++++++++++++
 2 files changed, 30 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7bca62f7/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index da668f0..60e1bec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -390,6 +390,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
 
   def convertFromAttributes(attributes: Seq[Attribute],
                             toThriftSchemaNames: Boolean = false): MessageType = {
+    checkSpecialCharacters(attributes)
     val fields = attributes.map(
       attribute =>
         fromDataType(attribute.dataType, attribute.name, attribute.nullable,
@@ -404,7 +405,20 @@ private[parquet] object ParquetTypesConverter extends Logging {
     }
   }
 
+  private def checkSpecialCharacters(schema: Seq[Attribute]) = {
+    // ,;{}()\n\t= and space character are special characters in Parquet schema
+    schema.map(_.name).foreach { name =>
+      if (name.matches(".*[ ,;{}()\n\t=].*")) {
+        sys.error(
+          s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\n\t=".
+             |Please use alias to rename it.
+           """.stripMargin.split("\n").mkString(" "))
+      }
+    }
+  }
+
   def convertToString(schema: Seq[Attribute]): String = {
+    checkSpecialCharacters(schema)
     StructType.fromAttributes(schema).json
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7bca62f7/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 1319c81..5f71e1b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -688,6 +688,22 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
 
     sql("DROP TABLE alwaysNullable")
   }
+
+  test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") {
+    val tempDir = Utils.createTempDir()
+    val filePath = new File(tempDir, "testParquet").getCanonicalPath
+    val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath
+
+    val df = Seq(1,2,3).map(i => (i, i.toString)).toDF("int", "str")
+    val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
+    intercept[RuntimeException](df2.saveAsParquetFile(filePath))
+
+    val df3 = df2.toDF("str", "max_int")
+    df3.saveAsParquetFile(filePath2)
+    val df4 = parquetFile(filePath2)
+    checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil)
+    assert(df4.columns === Array("str", "max_int"))
+  }
 }
 
 class ParquetDataSourceOffSourceSuite extends ParquetSourceSuiteBase {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org