You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/31 20:15:33 UTC

git commit: [SPARK-2743][SQL] Resolve original attributes in ParquetTableScan

Repository: spark
Updated Branches:
  refs/heads/master 92ca910eb -> 3072b9602


[SPARK-2743][SQL] Resolve original attributes in ParquetTableScan

Author: Michael Armbrust <mi...@databricks.com>

Closes #1647 from marmbrus/parquetCase and squashes the following commits:

a1799b7 [Michael Armbrust] move comment
2a2a68b [Michael Armbrust] Merge remote-tracking branch 'apache/master' into parquetCase
bb35d5b [Michael Armbrust] Fix test case that produced an invalid plan.
e6870bf [Michael Armbrust] Better error message.
539a2e1 [Michael Armbrust] Resolve original attributes in ParquetTableScan


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3072b960
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3072b960
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3072b960

Branch: refs/heads/master
Commit: 3072b96026fa3e63e8eef780f2b04dd81f11ea27
Parents: 92ca910
Author: Michael Armbrust <mi...@databricks.com>
Authored: Thu Jul 31 11:15:25 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Jul 31 11:15:25 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/parquet/ParquetTableOperations.scala | 14 ++++++++++----
 .../spark/sql/parquet/ParquetQuerySuite.scala      | 14 +-------------
 .../spark/sql/parquet/HiveParquetSuite.scala       | 17 +++++++++++++++++
 3 files changed, 28 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3072b960/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 912a9f0..759a2a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -51,13 +51,20 @@ import org.apache.spark.{Logging, SerializableWritable, TaskContext}
  * [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``.
  */
 case class ParquetTableScan(
-    // note: output cannot be transient, see
-    // https://issues.apache.org/jira/browse/SPARK-1367
-    output: Seq[Attribute],
+    attributes: Seq[Attribute],
     relation: ParquetRelation,
     columnPruningPred: Seq[Expression])
   extends LeafNode {
 
+  // The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
+  // by exprId. note: output cannot be transient, see
+  // https://issues.apache.org/jira/browse/SPARK-1367
+  val output = attributes.map { a =>
+    relation.output
+      .find(o => o.exprId == a.exprId)
+      .getOrElse(sys.error(s"Invalid parquet attribute $a in ${relation.output.mkString(",")}"))
+  }
+
   override def execute(): RDD[Row] = {
     val sc = sqlContext.sparkContext
     val job = new Job(sc.hadoopConfiguration)
@@ -110,7 +117,6 @@ case class ParquetTableScan(
       ParquetTableScan(prunedAttributes, relation, columnPruningPred)
     } else {
       sys.error("Warning: Could not validate Parquet schema projection in pruneColumns")
-      this
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3072b960/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 561f5b4..8955455 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -209,19 +209,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
   }
 
   test("Projection of simple Parquet file") {
-    SparkPlan.currentContext.set(TestSQLContext)
-    val scanner = new ParquetTableScan(
-      ParquetTestData.testData.output,
-      ParquetTestData.testData,
-      Seq())
-    val projected = scanner.pruneColumns(ParquetTypesConverter
-      .convertToAttributes(MessageTypeParser
-      .parseMessageType(ParquetTestData.subTestSchema)))
-    assert(projected.output.size === 2)
-    val result = projected
-      .execute()
-      .map(_.copy())
-      .collect()
+    val result = ParquetTestData.testData.select('myboolean, 'mylong).collect()
     result.zipWithIndex.foreach {
       case (row, index) => {
           if (index % 3 == 0)

http://git-wip-us.apache.org/repos/asf/spark/blob/3072b960/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index 3bfe49a..47526e3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.parquet
 
+import java.io.File
+
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
@@ -27,6 +29,8 @@ import org.apache.spark.util.Utils
 // Implicits
 import org.apache.spark.sql.hive.test.TestHive._
 
+case class Cases(lower: String, UPPER: String)
+
 class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
 
   val dirname = Utils.createTempDir()
@@ -55,6 +59,19 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft
     Utils.deleteRecursively(dirname)
   }
 
+  test("Case insensitive attribute names") {
+    val tempFile = File.createTempFile("parquet", "")
+    tempFile.delete()
+    sparkContext.parallelize(1 to 10)
+      .map(_.toString)
+      .map(i => Cases(i, i))
+      .saveAsParquetFile(tempFile.getCanonicalPath)
+
+    parquetFile(tempFile.getCanonicalPath).registerAsTable("cases")
+    hql("SELECT upper FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString)
+    hql("SELECT LOWER FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString)
+  }
+
   test("SELECT on Parquet table") {
     val rdd = hql("SELECT * FROM testsource").collect()
     assert(rdd != null)