You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/07 21:53:18 UTC

spark git commit: [SPARK-4203][SQL] Partition directories in random order when inserting into hive table

Repository: spark
Updated Branches:
  refs/heads/master a6405c5dd -> ac70c972a


[SPARK-4203][SQL] Partition directories in random order when inserting into hive table

When doing an insert into hive table with partitions the folders written to the file system are in a random order instead of the order defined in table creation. Seems that the loadPartition method in Hive.java has a Map<String,String> parameter but expects to be called with a map that has a defined ordering such as LinkedHashMap. Working on a test but having intillij problems

Author: Matthew Taylor <ma...@tbfe.net>

Closes #3076 from tbfenet/partition_dir_order_problem and squashes the following commits:

f1b9a52 [Matthew Taylor] Comment format fix
bca709f [Matthew Taylor] review changes
0e50f6b [Matthew Taylor] test fix
99f1a31 [Matthew Taylor] partition ordering fix
369e618 [Matthew Taylor] partition ordering fix


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac70c972
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac70c972
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac70c972

Branch: refs/heads/master
Commit: ac70c972a51952f801fd02dd5962c0a0c1aba8f8
Parents: a6405c5
Author: Matthew Taylor <ma...@tbfe.net>
Authored: Fri Nov 7 12:53:08 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Nov 7 12:53:08 2014 -0800

----------------------------------------------------------------------
 .../hive/execution/InsertIntoHiveTable.scala    | 13 ++++++--
 .../sql/hive/InsertIntoHiveTableSuite.scala     | 34 ++++++++++++++++++--
 2 files changed, 43 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ac70c972/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 74b4e7a..81390f6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive.execution
 
+import java.util
+
 import scala.collection.JavaConversions._
 
 import org.apache.hadoop.hive.common.`type`.HiveVarchar
@@ -203,6 +205,13 @@ case class InsertIntoHiveTable(
     // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
     val holdDDLTime = false
     if (partition.nonEmpty) {
+
+      // loadPartition call orders directories created on the iteration order of the this map
+      val orderedPartitionSpec = new util.LinkedHashMap[String,String]()
+      table.hiveQlTable.getPartCols().foreach{
+        entry=>
+          orderedPartitionSpec.put(entry.getName,partitionSpec.get(entry.getName).getOrElse(""))
+      }
       val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
       db.validatePartitionNameCharacters(partVals)
       // inheritTableSpecs is set to true. It should be set to false for a IMPORT query
@@ -214,7 +223,7 @@ case class InsertIntoHiveTable(
         db.loadDynamicPartitions(
           outputPath,
           qualifiedTableName,
-          partitionSpec,
+          orderedPartitionSpec,
           overwrite,
           numDynamicPartitions,
           holdDDLTime,
@@ -224,7 +233,7 @@ case class InsertIntoHiveTable(
         db.loadPartition(
           outputPath,
           qualifiedTableName,
-          partitionSpec,
+          orderedPartitionSpec,
           overwrite,
           holdDDLTime,
           inheritTableSpecs,

http://git-wip-us.apache.org/repos/asf/spark/blob/ac70c972/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 18dc937..5dbfb92 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql._
+import java.io.File
+
+import com.google.common.io.Files
+import org.apache.spark.sql.{QueryTest, _}
 import org.apache.spark.sql.hive.test.TestHive
 
 /* Implicits */
@@ -91,4 +93,32 @@ class InsertIntoHiveTableSuite extends QueryTest {
 
     sql("DROP TABLE hiveTableWithMapValue")
   }
+
+  test("SPARK-4203:random partition directory order") {
+    createTable[TestData]("tmp_table")
+    val tmpDir = Files.createTempDir()
+    sql(s"CREATE TABLE table_with_partition(c1 string) PARTITIONED by (p1 string,p2 string,p3 string,p4 string,p5 string) location '${tmpDir.toURI.toString}'  ")
+    sql("INSERT OVERWRITE TABLE table_with_partition  partition (p1='a',p2='b',p3='c',p4='c',p5='1') SELECT 'blarr' FROM tmp_table")
+    sql("INSERT OVERWRITE TABLE table_with_partition  partition (p1='a',p2='b',p3='c',p4='c',p5='2') SELECT 'blarr' FROM tmp_table")
+    sql("INSERT OVERWRITE TABLE table_with_partition  partition (p1='a',p2='b',p3='c',p4='c',p5='3') SELECT 'blarr' FROM tmp_table")
+    sql("INSERT OVERWRITE TABLE table_with_partition  partition (p1='a',p2='b',p3='c',p4='c',p5='4') SELECT 'blarr' FROM tmp_table")
+    def listFolders(path: File, acc: List[String]): List[List[String]] = {
+      val dir = path.listFiles()
+      val folders = dir.filter(_.isDirectory).toList
+      if (folders.isEmpty) {
+        List(acc.reverse)
+      } else {
+        folders.flatMap(x => listFolders(x, x.getName :: acc))
+      }
+    }
+    val expected = List(
+      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=2"::Nil,
+      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=3"::Nil ,
+      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=1"::Nil ,
+      "p1=a"::"p2=b"::"p3=c"::"p4=c"::"p5=4"::Nil
+    )
+    assert(listFolders(tmpDir,List()).sortBy(_.toString()) == expected.sortBy(_.toString))
+    sql("DROP TABLE table_with_partition")
+    sql("DROP TABLE tmp_table")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org