You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/01/28 01:17:24 UTC

spark git commit: [SPARK-19359][SQL] clear useless path after rename a partition with upper-case by HiveExternalCatalog

Repository: spark
Updated Branches:
  refs/heads/master bb1a1fe05 -> 1b5ee2003


[SPARK-19359][SQL] clear useless path after rename a partition with upper-case by HiveExternalCatalog

## What changes were proposed in this pull request?

Hive metastore is not case preserving and keep partition columns with lower case names.

If SparkSQL create a table with upper-case partion name use HiveExternalCatalog, when we rename partition, it first call the HiveClient to renamePartition, which will create a new lower case partition path, then SparkSql rename the lower case path to the upper-case.

while if the renamed partition contains more than one depth partition ,e.g. A=1/B=2, hive renamePartition change to a=1/b=2, then SparkSql rename it to A=1/B=2, but the a=1 still exists in the filesystem, we should also delete it.

## How was this patch tested?
unit test added

Author: windpiger <so...@outlook.com>

Closes #16700 from windpiger/clearUselessPathAfterRenamPartition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1b5ee200
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1b5ee200
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1b5ee200

Branch: refs/heads/master
Commit: 1b5ee2003c368d18a5f8c17c2a869ef5770c60a1
Parents: bb1a1fe
Author: windpiger <so...@outlook.com>
Authored: Fri Jan 27 17:17:17 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Jan 27 17:17:17 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveExternalCatalog.scala    | 35 +++++++++++++++++++
 .../PartitionProviderCompatibilitySuite.scala   | 36 ++++++++++++++++++++
 2 files changed, 71 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1b5ee200/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 208c8c9..5be991b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -839,6 +839,26 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     spec.map { case (k, v) => partCols.find(_.equalsIgnoreCase(k)).get -> v }
   }
 
+
+  /**
+   * The partition path created by Hive is in lowercase, while Spark SQL will
+   * rename it with the partition name in partitionColumnNames, and this function
+   * returns the extra lowercase path created by Hive, and then we can delete it.
+   * e.g. /path/A=1/B=2/C=3 is changed to /path/A=4/B=5/C=6, this function returns
+   * /path/a=4
+   */
+  def getExtraPartPathCreatedByHive(
+      spec: TablePartitionSpec,
+      partitionColumnNames: Seq[String],
+      tablePath: Path): Path = {
+    val partColumnNames = partitionColumnNames
+      .take(partitionColumnNames.indexWhere(col => col.toLowerCase != col) + 1)
+      .map(_.toLowerCase)
+
+    ExternalCatalogUtils.generatePartitionPath(lowerCasePartitionSpec(spec),
+      partColumnNames, tablePath)
+  }
+
   override def createPartitions(
       db: String,
       table: String,
@@ -899,6 +919,21 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
           spec, partitionColumnNames, tablePath)
         try {
           tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath)
+
+          // If the newSpec contains more than one depth partition, FileSystem.rename just deletes
+          // the leaf(i.e. wrongPath), we should check if wrongPath's parents need to be deleted.
+          // For example, give a newSpec 'A=1/B=2', after calling Hive's client.renamePartitions,
+          // the location path in FileSystem is changed to 'a=1/b=2', which is wrongPath, then
+          // although we renamed it to 'A=1/B=2', 'a=1/b=2' in FileSystem is deleted, but 'a=1'
+          // is still exists, which we also need to delete
+          val delHivePartPathAfterRename = getExtraPartPathCreatedByHive(
+            spec,
+            partitionColumnNames,
+            tablePath)
+
+          if (delHivePartPathAfterRename != wrongPath) {
+            tablePath.getFileSystem(hadoopConf).delete(delHivePartPathAfterRename, true)
+          }
         } catch {
           case e: IOException => throw new SparkException(
             s"Unable to rename partition path from $wrongPath to $rightPath", e)

http://git-wip-us.apache.org/repos/asf/spark/blob/1b5ee200/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index dca207a..1214a92 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -19,8 +19,11 @@ package org.apache.spark.sql.hive
 
 import java.io.File
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.metrics.source.HiveCatalogMetrics
 import org.apache.spark.sql.{AnalysisException, QueryTest}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
@@ -481,4 +484,37 @@ class PartitionProviderCompatibilitySuite
       assert(spark.sql("show partitions test").count() == 5)
     }
   }
+
+  test("partition path created by Hive should be deleted after renamePartitions with upper-case") {
+    withTable("t", "t1", "t2") {
+      Seq((1, 2, 3)).toDF("id", "A", "B").write.partitionBy("A", "B").saveAsTable("t")
+      spark.sql("alter table t partition(A=2, B=3) rename to partition(A=4, B=5)")
+
+      var table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+      var extraHivePath = new Path(table.location + "/a=4")
+      assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf())
+        .exists(extraHivePath), "partition path created by Hive should be deleted " +
+        "after renamePartitions with upper-case")
+
+      Seq((1, 2, 3, 4)).toDF("id", "A", "B", "C").write.partitionBy("A", "B", "C").saveAsTable("t1")
+      spark.sql("alter table t1 partition(A=2, B=3, C=4) rename to partition(A=5, B=6, C=7)")
+      table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t1"))
+      extraHivePath = new Path(table.location + "/a=5")
+      assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf())
+        .exists(extraHivePath), "partition path created by Hive should be deleted " +
+        "after renamePartitions with upper-case")
+
+      Seq((1, 2, 3, 4)).toDF("id", "a", "B", "C").write.partitionBy("a", "B", "C").saveAsTable("t2")
+      spark.sql("alter table t2 partition(a=2, B=3, C=4) rename to partition(a=4, B=5, C=6)")
+      table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
+      val partPath = new Path(table.location + "/a=4")
+      assert(partPath.getFileSystem(spark.sessionState.newHadoopConf())
+        .exists(partPath), "partition path of lower-case partition name should not be deleted")
+
+      extraHivePath = new Path(table.location + "/a=4/b=5")
+      assert(!extraHivePath.getFileSystem(spark.sessionState.newHadoopConf())
+        .exists(extraHivePath), "partition path created by Hive should be deleted " +
+        "after renamePartitions with upper-case")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org