You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2015/06/08 11:50:50 UTC

spark git commit: [SPARK-7939] [SQL] Add conf to enable/disable partition column type inference

Repository: spark
Updated Branches:
  refs/heads/master eacd4a929 -> 03ef6be9c


[SPARK-7939] [SQL] Add conf to enable/disable partition column type inference

JIRA: https://issues.apache.org/jira/browse/SPARK-7939

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #6503 from viirya/disable_partition_type_inference and squashes the following commits:

3e90470 [Liang-Chi Hsieh] Default to enable type inference and update docs.
455edb1 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into disable_partition_type_inference
9a57933 [Liang-Chi Hsieh] Add conf to enable/disable partition column type inference.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03ef6be9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03ef6be9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03ef6be9

Branch: refs/heads/master
Commit: 03ef6be9ce61a13dcd9d8c71298fb4be39119411
Parents: eacd4a9
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Mon Jun 8 17:50:38 2015 +0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Mon Jun 8 17:50:38 2015 +0800

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  6 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    |  6 ++
 .../spark/sql/sources/PartitioningUtils.scala   | 48 +++++++-----
 .../apache/spark/sql/sources/interfaces.scala   |  4 +-
 .../ParquetPartitionDiscoverySuite.scala        | 79 +++++++++++++++++++-
 5 files changed, 119 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03ef6be9/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index cde5830..40e33f7 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1102,7 +1102,11 @@ root
 {% endhighlight %}
 
 Notice that the data types of the partitioning columns are automatically inferred.  Currently,
-numeric data types and string type are supported.
+numeric data types and string type are supported. Sometimes users may not want to automatically
+infer the data types of the partitioning columns. For these use cases, the automatic type inference
+can be configured by `spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to
+`true`. When type inference is disabled, string type will be used for the partitioning columns.
+
 
 ### Schema merging
 

http://git-wip-us.apache.org/repos/asf/spark/blob/03ef6be9/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 77c6af2..c778889 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -71,6 +71,9 @@ private[spark] object SQLConf {
   // Whether to perform partition discovery when loading external data sources.  Default to true.
   val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"
 
+  // Whether to perform partition column type inference. Default to true.
+  val PARTITION_COLUMN_TYPE_INFERENCE = "spark.sql.sources.partitionColumnTypeInference.enabled"
+
   // The output committer class used by FSBasedRelation. The specified class needs to be a
   // subclass of org.apache.hadoop.mapreduce.OutputCommitter.
   val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
@@ -250,6 +253,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
   private[spark] def partitionDiscoveryEnabled() =
     getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean
 
+  private[spark] def partitionColumnTypeInferenceEnabled() =
+    getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE, "true").toBoolean
+
   // Do not use a value larger than 4000 as the default value of this property.
   // See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
   private[spark] def schemaStringLengthThreshold: Int =

http://git-wip-us.apache.org/repos/asf/spark/blob/03ef6be9/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index c4c99de..9f6ec2e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -72,10 +72,11 @@ private[sql] object PartitioningUtils {
    */
   private[sql] def parsePartitions(
       paths: Seq[Path],
-      defaultPartitionName: String): PartitionSpec = {
+      defaultPartitionName: String,
+      typeInference: Boolean): PartitionSpec = {
     // First, we need to parse every partition's path and see if we can find partition values.
     val pathsWithPartitionValues = paths.flatMap { path =>
-      parsePartition(path, defaultPartitionName).map(path -> _)
+      parsePartition(path, defaultPartitionName, typeInference).map(path -> _)
     }
 
     if (pathsWithPartitionValues.isEmpty) {
@@ -124,7 +125,8 @@ private[sql] object PartitioningUtils {
    */
   private[sql] def parsePartition(
       path: Path,
-      defaultPartitionName: String): Option[PartitionValues] = {
+      defaultPartitionName: String,
+      typeInference: Boolean): Option[PartitionValues] = {
     val columns = ArrayBuffer.empty[(String, Literal)]
     // Old Hadoop versions don't have `Path.isRoot`
     var finished = path.getParent == null
@@ -137,7 +139,7 @@ private[sql] object PartitioningUtils {
         return None
       }
 
-      val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName)
+      val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
       maybeColumn.foreach(columns += _)
       chopped = chopped.getParent
       finished = maybeColumn.isEmpty || chopped.getParent == null
@@ -153,7 +155,8 @@ private[sql] object PartitioningUtils {
 
   private def parsePartitionColumn(
       columnSpec: String,
-      defaultPartitionName: String): Option[(String, Literal)] = {
+      defaultPartitionName: String,
+      typeInference: Boolean): Option[(String, Literal)] = {
     val equalSignIndex = columnSpec.indexOf('=')
     if (equalSignIndex == -1) {
       None
@@ -164,7 +167,7 @@ private[sql] object PartitioningUtils {
       val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
       assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
 
-      val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName)
+      val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference)
       Some(columnName -> literal)
     }
   }
@@ -211,19 +214,28 @@ private[sql] object PartitioningUtils {
    */
   private[sql] def inferPartitionColumnValue(
       raw: String,
-      defaultPartitionName: String): Literal = {
-    // First tries integral types
-    Try(Literal.create(Integer.parseInt(raw), IntegerType))
-      .orElse(Try(Literal.create(JLong.parseLong(raw), LongType)))
-      // Then falls back to fractional types
-      .orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType)))
-      .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
-      .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited)))
-      // Then falls back to string
-      .getOrElse {
-        if (raw == defaultPartitionName) Literal.create(null, NullType)
-        else Literal.create(unescapePathName(raw), StringType)
+      defaultPartitionName: String,
+      typeInference: Boolean): Literal = {
+    if (typeInference) {
+      // First tries integral types
+      Try(Literal.create(Integer.parseInt(raw), IntegerType))
+        .orElse(Try(Literal.create(JLong.parseLong(raw), LongType)))
+        // Then falls back to fractional types
+        .orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType)))
+        .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
+        .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited)))
+        // Then falls back to string
+        .getOrElse {
+          if (raw == defaultPartitionName) Literal.create(null, NullType)
+          else Literal.create(unescapePathName(raw), StringType)
+        }
+    } else {
+      if (raw == defaultPartitionName) {
+        Literal.create(null, NullType)
+      } else {
+        Literal.create(unescapePathName(raw), StringType)
       }
+    }
   }
 
   private val upCastingOrder: Seq[DataType] =

http://git-wip-us.apache.org/repos/asf/spark/blob/03ef6be9/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 25887ba..d1547fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -491,9 +491,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
   }
 
   private def discoverPartitions(): PartitionSpec = {
+    val typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled()
     // We use leaf dirs containing data files to discover the schema.
     val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq
-    PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
+    PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME,
+      typeInference)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/03ef6be9/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index d9a010a..c2f1cc8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -48,7 +48,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
 
   test("column type inference") {
     def check(raw: String, literal: Literal): Unit = {
-      assert(inferPartitionColumnValue(raw, defaultPartitionName) === literal)
+      assert(inferPartitionColumnValue(raw, defaultPartitionName, true) === literal)
     }
 
     check("10", Literal.create(10, IntegerType))
@@ -60,12 +60,12 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
 
   test("parse partition") {
     def check(path: String, expected: Option[PartitionValues]): Unit = {
-      assert(expected === parsePartition(new Path(path), defaultPartitionName))
+      assert(expected === parsePartition(new Path(path), defaultPartitionName, true))
     }
 
     def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
       val message = intercept[T] {
-        parsePartition(new Path(path), defaultPartitionName).get
+        parsePartition(new Path(path), defaultPartitionName, true).get
       }.getMessage
 
       assert(message.contains(expected))
@@ -105,7 +105,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
 
   test("parse partitions") {
     def check(paths: Seq[String], spec: PartitionSpec): Unit = {
-      assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName) === spec)
+      assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) === spec)
     }
 
     check(Seq(
@@ -174,6 +174,77 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
       PartitionSpec.emptySpec)
   }
 
+  test("parse partitions with type inference disabled") {
+    def check(paths: Seq[String], spec: PartitionSpec): Unit = {
+      assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, false) === spec)
+    }
+
+    check(Seq(
+      "hdfs://host:9000/path/a=10/b=hello"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", StringType),
+          StructField("b", StringType))),
+        Seq(Partition(Row("10", "hello"), "hdfs://host:9000/path/a=10/b=hello"))))
+
+    check(Seq(
+      "hdfs://host:9000/path/a=10/b=20",
+      "hdfs://host:9000/path/a=10.5/b=hello"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", StringType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(Row("10", "20"), "hdfs://host:9000/path/a=10/b=20"),
+          Partition(Row("10.5", "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))
+
+    check(Seq(
+      "hdfs://host:9000/path/_temporary",
+      "hdfs://host:9000/path/a=10/b=20",
+      "hdfs://host:9000/path/a=10.5/b=hello",
+      "hdfs://host:9000/path/a=10.5/_temporary",
+      "hdfs://host:9000/path/a=10.5/_TeMpOrArY",
+      "hdfs://host:9000/path/a=10.5/b=hello/_temporary",
+      "hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY",
+      "hdfs://host:9000/path/_temporary/path",
+      "hdfs://host:9000/path/a=11/_temporary/path",
+      "hdfs://host:9000/path/a=10.5/b=world/_temporary/path"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", StringType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(Row("10", "20"), "hdfs://host:9000/path/a=10/b=20"),
+          Partition(Row("10.5", "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))
+
+    check(Seq(
+      s"hdfs://host:9000/path/a=10/b=20",
+      s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", StringType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(Row("10", "20"), s"hdfs://host:9000/path/a=10/b=20"),
+          Partition(Row(null, "hello"), s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"))))
+
+    check(Seq(
+      s"hdfs://host:9000/path/a=10/b=$defaultPartitionName",
+      s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"),
+      PartitionSpec(
+        StructType(Seq(
+          StructField("a", StringType),
+          StructField("b", StringType))),
+        Seq(
+          Partition(Row("10", null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
+          Partition(Row("10.5", null), s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"))))
+
+    check(Seq(
+      s"hdfs://host:9000/path1",
+      s"hdfs://host:9000/path2"),
+      PartitionSpec.emptySpec)
+  }
+
   test("read partitioned table - normal case") {
     withTempDir { base =>
       for {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org