You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/02/27 05:46:07 UTC

spark git commit: [SPARK-6024][SQL] When a data source table has too many columns, it's schema cannot be stored in metastore.

Repository: spark
Updated Branches:
  refs/heads/master 4ad5153f5 -> 5e5ad6558


[SPARK-6024][SQL] When a data source table has too many columns, it's schema cannot be stored in metastore.

JIRA: https://issues.apache.org/jira/browse/SPARK-6024

Author: Yin Huai <yh...@databricks.com>

Closes #4795 from yhuai/wideSchema and squashes the following commits:

4882e6f [Yin Huai] Address comments.
73e71b4 [Yin Huai] Address comments.
143927a [Yin Huai] Simplify code.
cc1d472 [Yin Huai] Make the schema wider.
12bacae [Yin Huai] If the JSON string of a schema is too large, split it before storing it in metastore.
e9b4f70 [Yin Huai] Failed test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5e5ad655
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5e5ad655
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5e5ad655

Branch: refs/heads/master
Commit: 5e5ad6558d60cfbf360708584e883e80d363e33e
Parents: 4ad5153
Author: Yin Huai <yh...@databricks.com>
Authored: Thu Feb 26 20:46:05 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Feb 26 20:46:05 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    | 10 +++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 29 ++++++++++++++++----
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 21 ++++++++++++++
 3 files changed, 54 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5e5ad655/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index a08c0f5..4815620 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -51,6 +51,11 @@ private[spark] object SQLConf {
 
   // This is used to set the default data source
   val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"
+  // This is used to control the when we will split a schema's JSON string to multiple pieces
+  // in order to fit the JSON string in metastore's table property (by default, the value has
+  // a length restriction of 4000 characters). We will split the JSON string of a schema
+  // to its length exceeds the threshold.
+  val SCHEMA_STRING_LENGTH_THRESHOLD = "spark.sql.sources.schemaStringLengthThreshold"
 
   // Whether to perform eager analysis when constructing a dataframe.
   // Set to false when debugging requires the ability to look at invalid query plans.
@@ -177,6 +182,11 @@ private[sql] class SQLConf extends Serializable {
   private[spark] def defaultDataSourceName: String =
     getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")
 
+  // Do not use a value larger than 4000 as the default value of this property.
+  // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
+  private[spark] def schemaStringLengthThreshold: Int =
+    getConf(SCHEMA_STRING_LENGTH_THRESHOLD, "4000").toInt
+
   private[spark] def dataFrameEagerAnalysis: Boolean =
     getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5e5ad655/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 8af5a48..d3ad364 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -69,13 +69,23 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
         val table = synchronized {
           client.getTable(in.database, in.name)
         }
-        val schemaString = table.getProperty("spark.sql.sources.schema")
         val userSpecifiedSchema =
-          if (schemaString == null) {
-            None
-          } else {
-            Some(DataType.fromJson(schemaString).asInstanceOf[StructType])
+          Option(table.getProperty("spark.sql.sources.schema.numParts")).map { numParts =>
+            val parts = (0 until numParts.toInt).map { index =>
+              val part = table.getProperty(s"spark.sql.sources.schema.part.${index}")
+              if (part == null) {
+                throw new AnalysisException(
+                  s"Could not read schema from the metastore because it is corrupted " +
+                  s"(missing part ${index} of the schema).")
+              }
+
+              part
+            }
+            // Stick all parts back to a single schema string in the JSON representation
+            // and convert it back to a StructType.
+            DataType.fromJson(parts.mkString).asInstanceOf[StructType]
           }
+
         // It does not appear that the ql client for the metastore has a way to enumerate all the
         // SerDe properties directly...
         val options = table.getTTable.getSd.getSerdeInfo.getParameters.toMap
@@ -119,7 +129,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
 
     tbl.setProperty("spark.sql.sources.provider", provider)
     if (userSpecifiedSchema.isDefined) {
-      tbl.setProperty("spark.sql.sources.schema", userSpecifiedSchema.get.json)
+      val threshold = hive.conf.schemaStringLengthThreshold
+      val schemaJsonString = userSpecifiedSchema.get.json
+      // Split the JSON string.
+      val parts = schemaJsonString.grouped(threshold).toSeq
+      tbl.setProperty("spark.sql.sources.schema.numParts", parts.size.toString)
+      parts.zipWithIndex.foreach { case (part, index) =>
+        tbl.setProperty(s"spark.sql.sources.schema.part.${index}", part)
+      }
     }
     options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5e5ad655/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 0bd8277..00306f1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -591,4 +591,25 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
       setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource)
     }
   }
+
+  test("SPARK-6024 wide schema support") {
+    // We will need 80 splits for this schema if the threshold is 4000.
+    val schema = StructType((1 to 5000).map(i => StructField(s"c_${i}", StringType, true)))
+    assert(
+      schema.json.size > conf.schemaStringLengthThreshold,
+      "To correctly test the fix of SPARK-6024, the value of " +
+      s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.size}")
+    // Manually create a metastore data source table.
+    catalog.createDataSourceTable(
+      tableName = "wide_schema",
+      userSpecifiedSchema = Some(schema),
+      provider = "json",
+      options = Map("path" -> "just a dummy path"),
+      isExternal = false)
+
+    invalidateTable("wide_schema")
+
+    val actualSchema = table("wide_schema").schema
+    assert(schema === actualSchema)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org