You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/18 21:13:27 UTC

spark git commit: [SQL] Support partitioned parquet tables that have the key in both the directory and the file

Repository: spark
Updated Branches:
  refs/heads/master b54c6ab3c -> 90d72ec85


[SQL] Support partitioned parquet tables that have the key in both the directory and the file

Author: Michael Armbrust <mi...@databricks.com>

Closes #3272 from marmbrus/keyInPartitionedTable and squashes the following commits:

447f08c [Michael Armbrust] Support partitioned parquet tables that have the key in both the directory and the file


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90d72ec8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90d72ec8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90d72ec8

Branch: refs/heads/master
Commit: 90d72ec8502f7ec11d2fe42f08c884ad2159266f
Parents: b54c6ab
Author: Michael Armbrust <mi...@databricks.com>
Authored: Tue Nov 18 12:13:23 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Nov 18 12:13:23 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/hive/HiveStrategies.scala  |  18 ++-
 .../sql/parquet/ParquetMetastoreSuite.scala     | 158 +++++++++++--------
 2 files changed, 108 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/90d72ec8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 3a49ddd..56fc852 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -58,12 +58,18 @@ private[hive] trait HiveStrategies {
       def lowerCase =
         new SchemaRDD(s.sqlContext, s.logicalPlan)
 
-      def addPartitioningAttributes(attrs: Seq[Attribute]) =
-        new SchemaRDD(
-          s.sqlContext,
-          s.logicalPlan transform {
-            case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
-          })
+      def addPartitioningAttributes(attrs: Seq[Attribute]) = {
+        // Don't add the partitioning key if its already present in the data.
+        if (attrs.map(_.name).toSet.subsetOf(s.logicalPlan.output.map(_.name).toSet)) {
+          s
+        } else {
+          new SchemaRDD(
+            s.sqlContext,
+            s.logicalPlan transform {
+              case p: ParquetRelation => p.copy(partitioningAttributes = attrs)
+            })
+        }
+      }
     }
 
     implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {

http://git-wip-us.apache.org/repos/asf/spark/blob/90d72ec8/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
index 86adbbf..cc65242 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/ParquetMetastoreSuite.scala
@@ -27,7 +27,11 @@ import org.apache.spark.sql.QueryTest
 import org.apache.spark.sql.hive.execution.HiveTableScan
 import org.apache.spark.sql.hive.test.TestHive._
 
+// The data where the partitioning key exists only in the directory structure.
 case class ParquetData(intField: Int, stringField: String)
+// The data that also includes the partitioning key
+case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
+
 
 /**
  * Tests for our SerDe -> Native parquet scan conversion.
@@ -45,6 +49,17 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
         .saveAsParquetFile(partDir.getCanonicalPath)
     }
 
+    val partitionedTableDirWithKey = File.createTempFile("parquettests", "sparksql")
+    partitionedTableDirWithKey.delete()
+    partitionedTableDirWithKey.mkdir()
+
+    (1 to 10).foreach { p =>
+      val partDir = new File(partitionedTableDirWithKey, s"p=$p")
+      sparkContext.makeRDD(1 to 10)
+        .map(i => ParquetDataWithKey(p, i, s"part-$p"))
+        .saveAsParquetFile(partDir.getCanonicalPath)
+    }
+
     sql(s"""
     create external table partitioned_parquet
     (
@@ -60,6 +75,20 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
     """)
 
     sql(s"""
+    create external table partitioned_parquet_with_key
+    (
+      intField INT,
+      stringField STRING
+    )
+    PARTITIONED BY (p int)
+    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+     STORED AS
+     INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+     OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+    location '${partitionedTableDirWithKey.getCanonicalPath}'
+    """)
+
+    sql(s"""
     create external table normal_parquet
     (
       intField INT,
@@ -76,6 +105,10 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
       sql(s"ALTER TABLE partitioned_parquet ADD PARTITION (p=$p)")
     }
 
+    (1 to 10).foreach { p =>
+      sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
+    }
+
     setConf("spark.sql.hive.convertMetastoreParquet", "true")
   }
 
@@ -83,75 +116,76 @@ class ParquetMetastoreSuite extends QueryTest with BeforeAndAfterAll {
     setConf("spark.sql.hive.convertMetastoreParquet", "false")
   }
 
-  test("project the partitioning column") {
-    checkAnswer(
-      sql("SELECT p, count(*) FROM partitioned_parquet group by p"),
-      (1, 10) ::
-      (2, 10) ::
-      (3, 10) ::
-      (4, 10) ::
-      (5, 10) ::
-      (6, 10) ::
-      (7, 10) ::
-      (8, 10) ::
-      (9, 10) ::
-      (10, 10) :: Nil
-    )
-  }
+  Seq("partitioned_parquet", "partitioned_parquet_with_key").foreach { table =>
+    test(s"project the partitioning column $table") {
+      checkAnswer(
+        sql(s"SELECT p, count(*) FROM $table group by p"),
+        (1, 10) ::
+        (2, 10) ::
+        (3, 10) ::
+        (4, 10) ::
+        (5, 10) ::
+        (6, 10) ::
+        (7, 10) ::
+        (8, 10) ::
+        (9, 10) ::
+        (10, 10) :: Nil
+      )
+    }
 
-  test("project partitioning and non-partitioning columns") {
-    checkAnswer(
-      sql("SELECT stringField, p, count(intField) " +
-        "FROM partitioned_parquet GROUP BY p, stringField"),
-      ("part-1", 1, 10) ::
-      ("part-2", 2, 10) ::
-      ("part-3", 3, 10) ::
-      ("part-4", 4, 10) ::
-      ("part-5", 5, 10) ::
-      ("part-6", 6, 10) ::
-      ("part-7", 7, 10) ::
-      ("part-8", 8, 10) ::
-      ("part-9", 9, 10) ::
-      ("part-10", 10, 10) :: Nil
-    )
-  }
+    test(s"project partitioning and non-partitioning columns $table") {
+      checkAnswer(
+        sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"),
+        ("part-1", 1, 10) ::
+        ("part-2", 2, 10) ::
+        ("part-3", 3, 10) ::
+        ("part-4", 4, 10) ::
+        ("part-5", 5, 10) ::
+        ("part-6", 6, 10) ::
+        ("part-7", 7, 10) ::
+        ("part-8", 8, 10) ::
+        ("part-9", 9, 10) ::
+        ("part-10", 10, 10) :: Nil
+      )
+    }
 
-  test("simple count") {
-    checkAnswer(
-      sql("SELECT COUNT(*) FROM partitioned_parquet"),
-      100)
-  }
+    test(s"simple count $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table"),
+        100)
+    }
 
-  test("pruned count") {
-    checkAnswer(
-      sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p = 1"),
-      10)
-  }
+    test(s"pruned count $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
+        10)
+    }
 
-  test("multi-partition pruned count") {
-    checkAnswer(
-      sql("SELECT COUNT(*) FROM partitioned_parquet WHERE p IN (1,2,3)"),
-      30)
-  }
+    test(s"multi-partition pruned count $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
+        30)
+    }
 
-  test("non-partition predicates") {
-    checkAnswer(
-      sql("SELECT COUNT(*) FROM partitioned_parquet WHERE intField IN (1,2,3)"),
-      30)
-  }
+    test(s"non-partition predicates $table") {
+      checkAnswer(
+        sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
+        30)
+    }
 
-  test("sum") {
-    checkAnswer(
-      sql("SELECT SUM(intField) FROM partitioned_parquet WHERE intField IN (1,2,3) AND p = 1"),
-      1 + 2 + 3)
-  }
+    test(s"sum $table") {
+      checkAnswer(
+        sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"),
+        1 + 2 + 3)
+    }
 
-  test("hive udfs") {
-    checkAnswer(
-      sql("SELECT concat(stringField, stringField) FROM partitioned_parquet"),
-      sql("SELECT stringField FROM partitioned_parquet").map {
-        case Row(s: String) => Row(s + s)
-      }.collect().toSeq)
+    test(s"hive udfs $table") {
+      checkAnswer(
+        sql(s"SELECT concat(stringField, stringField) FROM $table"),
+        sql(s"SELECT stringField FROM $table").map {
+          case Row(s: String) => Row(s + s)
+        }.collect().toSeq)
+    }
   }
 
   test("non-part select(*)") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org