You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/02/09 05:39:30 UTC

spark git commit: [SPARK-19359][SQL] renaming partition should not leave useless directories

Repository: spark
Updated Branches:
  refs/heads/master 64cae22f7 -> 50a991264


[SPARK-19359][SQL] renaming partition should not leave useless directories

## What changes were proposed in this pull request?

Hive metastore is not case-preserving and keep partition columns with lower case names. If Spark SQL creates a table with upper-case partition column names using `HiveExternalCatalog`, when we rename partition, it first calls the HiveClient to renamePartition, which will create a new lower case partition path, then Spark SQL renames the lower case path to upper-case.

However, when we rename a nested path, different file systems have different behaviors. e.g. in jenkins, renaming `a=1/b=2` to `A=2/B=2` will success, but leave an empty directory `a=1`. in mac os, the renaming doesn't work as expected and result to `a=1/B=2`.

This PR renames the partition directory recursively from the first partition column in `HiveExternalCatalog`, to be most compatible with different file systems.

## How was this patch tested?

new regression test

Author: Wenchen Fan <we...@databricks.com>

Closes #16837 from cloud-fan/partition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/50a99126
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/50a99126
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/50a99126

Branch: refs/heads/master
Commit: 50a991264c16e4c4126e88668ef4fbd048c782b8
Parents: 64cae22
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Feb 9 00:39:22 2017 -0500
Committer: gatorsmile <ga...@gmail.com>
Committed: Thu Feb 9 00:39:22 2017 -0500

----------------------------------------------------------------------
 .../catalyst/catalog/ExternalCatalogUtils.scala | 17 ++++---
 .../spark/sql/hive/HiveExternalCatalog.scala    | 53 +++++++++++++++++---
 .../PartitionProviderCompatibilitySuite.scala   | 29 +++++++++++
 3 files changed, 84 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/50a99126/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index 4331841..58ced54 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -108,18 +108,21 @@ object ExternalCatalogUtils {
       partitionColumnNames: Seq[String],
       tablePath: Path): Path = {
     val partitionPathStrings = partitionColumnNames.map { col =>
-      val partitionValue = spec(col)
-      val partitionString = if (partitionValue == null) {
-        DEFAULT_PARTITION_NAME
-      } else {
-        escapePathName(partitionValue)
-      }
-      escapePathName(col) + "=" + partitionString
+      getPartitionPathString(col, spec(col))
     }
     partitionPathStrings.foldLeft(tablePath) { (totalPath, nextPartPath) =>
       new Path(totalPath, nextPartPath)
     }
   }
+
+  def getPartitionPathString(col: String, value: String): String = {
+    val partitionString = if (value == null) {
+      DEFAULT_PARTITION_NAME
+    } else {
+      escapePathName(value)
+    }
+    escapePathName(col) + "=" + partitionString
+  }
 }
 
 object CatalogUtils {

http://git-wip-us.apache.org/repos/asf/spark/blob/50a99126/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 208c8c9..1fc8e8e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -892,21 +892,58 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     val hasUpperCasePartitionColumn = partitionColumnNames.exists(col => col.toLowerCase != col)
     if (tableMeta.tableType == MANAGED && hasUpperCasePartitionColumn) {
       val tablePath = new Path(tableMeta.location)
+      val fs = tablePath.getFileSystem(hadoopConf)
       val newParts = newSpecs.map { spec =>
+        val rightPath = renamePartitionDirectory(fs, tablePath, partitionColumnNames, spec)
         val partition = client.getPartition(db, table, lowerCasePartitionSpec(spec))
-        val wrongPath = new Path(partition.location)
-        val rightPath = ExternalCatalogUtils.generatePartitionPath(
-          spec, partitionColumnNames, tablePath)
+        partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString)))
+      }
+      alterPartitions(db, table, newParts)
+    }
+  }
+
+  /**
+   * Rename the partition directory w.r.t. the actual partition columns.
+   *
+   * It will recursively rename the partition directory from the first partition column, to be most
+   * compatible with different file systems. e.g. in some file systems, renaming `a=1/b=2` to
+   * `A=1/B=2` will result to `a=1/B=2`, while in some other file systems, the renaming works, but
+   * will leave an empty directory `a=1`.
+   */
+  private def renamePartitionDirectory(
+      fs: FileSystem,
+      tablePath: Path,
+      partCols: Seq[String],
+      newSpec: TablePartitionSpec): Path = {
+    import ExternalCatalogUtils.getPartitionPathString
+
+    var currentFullPath = tablePath
+    partCols.foreach { col =>
+      val partValue = newSpec(col)
+      val expectedPartitionString = getPartitionPathString(col, partValue)
+      val expectedPartitionPath = new Path(currentFullPath, expectedPartitionString)
+
+      if (fs.exists(expectedPartitionPath)) {
+        // It is possible that some parental partition directories already exist or doesn't need to
+        // be renamed. e.g. the partition columns are `a` and `B`, then we don't need to rename
+        // `/table_path/a=1`. Or we already have a partition directory `A=1/B=2`, and we rename
+        // another partition to `A=1/B=3`, then we will have `A=1/B=2` and `a=1/b=3`, and we should
+        // just move `a=1/b=3` into `A=1` with new name `B=3`.
+      } else {
+        val actualPartitionString = getPartitionPathString(col.toLowerCase, partValue)
+        val actualPartitionPath = new Path(currentFullPath, actualPartitionString)
         try {
-          tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath)
+          fs.rename(actualPartitionPath, expectedPartitionPath)
         } catch {
-          case e: IOException => throw new SparkException(
-            s"Unable to rename partition path from $wrongPath to $rightPath", e)
+          case e: IOException =>
+            throw new SparkException("Unable to rename partition path from " +
+              s"$actualPartitionPath to $expectedPartitionPath", e)
         }
-        partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString)))
       }
-      alterPartitions(db, table, newParts)
+      currentFullPath = expectedPartitionPath
     }
+
+    currentFullPath
   }
 
   override def alterPartitions(

http://git-wip-us.apache.org/repos/asf/spark/blob/50a99126/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index dca207a..9638596 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -19,8 +19,11 @@ package org.apache.spark.sql.hive
 
 import java.io.File
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
@@ -481,4 +484,30 @@ class PartitionProviderCompatibilitySuite
       assert(spark.sql("show partitions test").count() == 5)
     }
   }
+
+  test("SPARK-19359: renaming partition should not leave useless directories") {
+    withTable("t", "t1") {
+      Seq((1, 2, 3)).toDF("id", "A", "B").write.partitionBy("A", "B").saveAsTable("t")
+      spark.sql("alter table t partition(A=2, B=3) rename to partition(A=4, B=5)")
+
+      var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+      var tablePath = new Path(table.location)
+      val fs = tablePath.getFileSystem(spark.sessionState.newHadoopConf())
+      // the `A=2` directory is still there, we follow this behavior from hive.
+      assert(fs.listStatus(tablePath)
+        .filterNot(_.getPath.toString.contains("A=2")).count(_.isDirectory) == 1)
+      assert(fs.listStatus(new Path(tablePath, "A=4")).count(_.isDirectory) == 1)
+
+
+      Seq((1, 2, 3, 4)).toDF("id", "A", "b", "C").write.partitionBy("A", "b", "C").saveAsTable("t1")
+      spark.sql("alter table t1 partition(A=2, b=3, C=4) rename to partition(A=4, b=5, C=6)")
+      table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+      tablePath = new Path(table.location)
+      // the `A=2` directory is still there, we follow this behavior from hive.
+      assert(fs.listStatus(tablePath)
+        .filterNot(_.getPath.toString.contains("A=2")).count(_.isDirectory) == 1)
+      assert(fs.listStatus(new Path(tablePath, "A=4")).count(_.isDirectory) == 1)
+      assert(fs.listStatus(new Path(new Path(tablePath, "A=4"), "b=5")).count(_.isDirectory) == 1)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org