You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/03/15 00:24:48 UTC

spark git commit: [SPARK-19887][SQL] dynamic partition keys can be null or empty string

Repository: spark
Updated Branches:
  refs/heads/master 7ded39c22 -> dacc382f0


[SPARK-19887][SQL] dynamic partition keys can be null or empty string

## What changes were proposed in this pull request?

When dynamic partition value is null or empty string, we should write the data to a directory like `a=__HIVE_DEFAULT_PARTITION__`, when we read the data back, we should respect this special directory name and treat it as null.

This is the same behavior of impala, see https://issues.apache.org/jira/browse/IMPALA-252

## How was this patch tested?

new regression test

Author: Wenchen Fan <we...@databricks.com>

Closes #17277 from cloud-fan/partition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dacc382f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dacc382f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dacc382f

Branch: refs/heads/master
Commit: dacc382f0c918f1ca808228484305ce0e21c705e
Parents: 7ded39c
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Mar 15 08:24:41 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Mar 15 08:24:41 2017 +0800

----------------------------------------------------------------------
 .../catalyst/catalog/ExternalCatalogUtils.scala |  2 +-
 .../spark/sql/catalyst/catalog/interface.scala  |  9 ++++++--
 .../sql/execution/DataSourceScanExec.scala      |  2 +-
 .../datasources/FileFormatWriter.scala          | 11 ++++-----
 .../datasources/PartitioningUtils.scala         |  3 +--
 .../spark/sql/hive/HiveExternalCatalog.scala    |  4 ++--
 .../PartitionProviderCompatibilitySuite.scala   | 24 +++++++++++++++++++-
 7 files changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dacc382f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index a418edc..a8693dc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -118,7 +118,7 @@ object ExternalCatalogUtils {
   }
 
   def getPartitionPathString(col: String, value: String): String = {
-    val partitionString = if (value == null) {
+    val partitionString = if (value == null || value.isEmpty) {
       DEFAULT_PARTITION_NAME
     } else {
       escapePathName(value)

http://git-wip-us.apache.org/repos/asf/spark/blob/dacc382f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index b862dea..70ed44e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -116,7 +116,12 @@ case class CatalogTablePartition(
     val timeZoneId = caseInsensitiveProperties.getOrElse(
       DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
     InternalRow.fromSeq(partitionSchema.map { field =>
-      Cast(Literal(spec(field.name)), field.dataType, Option(timeZoneId)).eval()
+      val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
+        null
+      } else {
+        spec(field.name)
+      }
+      Cast(Literal(partValue), field.dataType, Option(timeZoneId)).eval()
     })
   }
 }
@@ -164,7 +169,7 @@ case class BucketSpec(
  * @param tracksPartitionsInCatalog whether this table's partition metadata is stored in the
  *                                  catalog. If false, it is inferred automatically based on file
  *                                  structure.
- * @param schemaPresevesCase Whether or not the schema resolved for this table is case-sensitive.
+ * @param schemaPreservesCase Whether or not the schema resolved for this table is case-sensitive.
  *                           When using a Hive Metastore, this flag is set to false if a case-
  *                           sensitive schema was unable to be read from the table properties.
  *                           Used to trigger case-sensitive schema inference at query time, when

http://git-wip-us.apache.org/repos/asf/spark/blob/dacc382f/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 39b010e..8ebad67 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -319,7 +319,7 @@ case class FileSourceScanExec(
     val input = ctx.freshName("input")
     ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
     val exprRows = output.zipWithIndex.map{ case (a, i) =>
-      new BoundReference(i, a.dataType, a.nullable)
+      BoundReference(i, a.dataType, a.nullable)
     }
     val row = ctx.freshName("row")
     ctx.INPUT_ROW = row

http://git-wip-us.apache.org/repos/asf/spark/blob/dacc382f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index ce33298..7957224 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -335,14 +335,11 @@ object FileFormatWriter extends Logging {
     /** Expressions that given partition columns build a path string like: col1=val/col2=val/... */
     private def partitionPathExpression: Seq[Expression] = {
       desc.partitionColumns.zipWithIndex.flatMap { case (c, i) =>
-        val escaped = ScalaUDF(
-          ExternalCatalogUtils.escapePathName _,
+        val partitionName = ScalaUDF(
+          ExternalCatalogUtils.getPartitionPathString _,
           StringType,
-          Seq(Cast(c, StringType, Option(desc.timeZoneId))),
-          Seq(StringType))
-        val str = If(IsNull(c), Literal(ExternalCatalogUtils.DEFAULT_PARTITION_NAME), escaped)
-        val partitionName = Literal(ExternalCatalogUtils.escapePathName(c.name) + "=") :: str :: Nil
-        if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
+          Seq(Literal(c.name), Cast(c, StringType, Option(desc.timeZoneId))))
+        if (i == 0) Seq(partitionName) else Seq(Literal(Path.SEPARATOR), partitionName)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dacc382f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 09876bb..0398092 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -33,7 +33,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
 
 // TODO: We should tighten up visibility of the classes here once we clean up Hive coupling.
 
@@ -129,7 +128,7 @@ object PartitioningUtils {
       //   "hdfs://host:9000/invalidPath"
       //   "hdfs://host:9000/path"
       // TODO: Selective case sensitivity.
-      val discoveredBasePaths = optDiscoveredBasePaths.flatMap(x => x).map(_.toString.toLowerCase())
+      val discoveredBasePaths = optDiscoveredBasePaths.flatten.map(_.toString.toLowerCase())
       assert(
         discoveredBasePaths.distinct.size == 1,
         "Conflicting directory structures detected. Suspicious paths:\b" +

http://git-wip-us.apache.org/repos/asf/spark/blob/dacc382f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8860b7d..8a3c81a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -1012,8 +1012,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
     val clientPartitionNames =
       client.getPartitionNames(catalogTable, partialSpec.map(lowerCasePartitionSpec))
-    clientPartitionNames.map { partName =>
-      val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partName)
+    clientPartitionNames.map { partitionPath =>
+      val partSpec = PartitioningUtils.parsePathFragmentAsSeq(partitionPath)
       partSpec.map { case (partName, partValue) =>
         partColNameMap(partName.toLowerCase) + "=" + escapePathName(partValue)
       }.mkString("/")

http://git-wip-us.apache.org/repos/asf/spark/blob/dacc382f/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index 9638596..9440a17 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -22,7 +22,7 @@ import java.io.File
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
@@ -316,6 +316,28 @@ class PartitionProviderCompatibilitySuite
         }
       }
     }
+
+    test(s"SPARK-19887 partition value is null - partition management $enabled") {
+      withTable("test") {
+        Seq((1, "p", 1), (2, null, 2)).toDF("a", "b", "c")
+          .write.partitionBy("b", "c").saveAsTable("test")
+        checkAnswer(spark.table("test"),
+          Row(1, "p", 1) :: Row(2, null, 2) :: Nil)
+
+        Seq((3, null: String, 3)).toDF("a", "b", "c")
+          .write.mode("append").partitionBy("b", "c").saveAsTable("test")
+        checkAnswer(spark.table("test"),
+          Row(1, "p", 1) :: Row(2, null, 2) :: Row(3, null, 3) :: Nil)
+        // make sure partition pruning also works.
+        checkAnswer(spark.table("test").filter($"b".isNotNull), Row(1, "p", 1))
+
+        // empty string is an invalid partition value and we treat it as null when read back.
+        Seq((4, "", 4)).toDF("a", "b", "c")
+          .write.mode("append").partitionBy("b", "c").saveAsTable("test")
+        checkAnswer(spark.table("test"),
+          Row(1, "p", 1) :: Row(2, null, 2) :: Row(3, null, 3) :: Row(4, null, 4) :: Nil)
+      }
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org