You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/31 20:15:33 UTC
git commit: [SPARK-2743][SQL] Resolve original attributes in
ParquetTableScan
Repository: spark
Updated Branches:
refs/heads/master 92ca910eb -> 3072b9602
[SPARK-2743][SQL] Resolve original attributes in ParquetTableScan
Author: Michael Armbrust <mi...@databricks.com>
Closes #1647 from marmbrus/parquetCase and squashes the following commits:
a1799b7 [Michael Armbrust] move comment
2a2a68b [Michael Armbrust] Merge remote-tracking branch 'apache/master' into parquetCase
bb35d5b [Michael Armbrust] Fix test case that produced an invalid plan.
e6870bf [Michael Armbrust] Better error message.
539a2e1 [Michael Armbrust] Resolve original attributes in ParquetTableScan
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3072b960
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3072b960
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3072b960
Branch: refs/heads/master
Commit: 3072b96026fa3e63e8eef780f2b04dd81f11ea27
Parents: 92ca910
Author: Michael Armbrust <mi...@databricks.com>
Authored: Thu Jul 31 11:15:25 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Jul 31 11:15:25 2014 -0700
----------------------------------------------------------------------
.../spark/sql/parquet/ParquetTableOperations.scala | 14 ++++++++++----
.../spark/sql/parquet/ParquetQuerySuite.scala | 14 +-------------
.../spark/sql/parquet/HiveParquetSuite.scala | 17 +++++++++++++++++
3 files changed, 28 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3072b960/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index 912a9f0..759a2a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -51,13 +51,20 @@ import org.apache.spark.{Logging, SerializableWritable, TaskContext}
* [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``.
*/
case class ParquetTableScan(
- // note: output cannot be transient, see
- // https://issues.apache.org/jira/browse/SPARK-1367
- output: Seq[Attribute],
+ attributes: Seq[Attribute],
relation: ParquetRelation,
columnPruningPred: Seq[Expression])
extends LeafNode {
+ // The resolution of Parquet attributes is case sensitive, so we resolve the original attributes
+ // by exprId. note: output cannot be transient, see
+ // https://issues.apache.org/jira/browse/SPARK-1367
+ val output = attributes.map { a =>
+ relation.output
+ .find(o => o.exprId == a.exprId)
+ .getOrElse(sys.error(s"Invalid parquet attribute $a in ${relation.output.mkString(",")}"))
+ }
+
override def execute(): RDD[Row] = {
val sc = sqlContext.sparkContext
val job = new Job(sc.hadoopConfiguration)
@@ -110,7 +117,6 @@ case class ParquetTableScan(
ParquetTableScan(prunedAttributes, relation, columnPruningPred)
} else {
sys.error("Warning: Could not validate Parquet schema projection in pruneColumns")
- this
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3072b960/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 561f5b4..8955455 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -209,19 +209,7 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
}
test("Projection of simple Parquet file") {
- SparkPlan.currentContext.set(TestSQLContext)
- val scanner = new ParquetTableScan(
- ParquetTestData.testData.output,
- ParquetTestData.testData,
- Seq())
- val projected = scanner.pruneColumns(ParquetTypesConverter
- .convertToAttributes(MessageTypeParser
- .parseMessageType(ParquetTestData.subTestSchema)))
- assert(projected.output.size === 2)
- val result = projected
- .execute()
- .map(_.copy())
- .collect()
+ val result = ParquetTestData.testData.select('myboolean, 'mylong).collect()
result.zipWithIndex.foreach {
case (row, index) => {
if (index % 3 == 0)
http://git-wip-us.apache.org/repos/asf/spark/blob/3072b960/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index 3bfe49a..47526e3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.parquet
+import java.io.File
+
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
@@ -27,6 +29,8 @@ import org.apache.spark.util.Utils
// Implicits
import org.apache.spark.sql.hive.test.TestHive._
+case class Cases(lower: String, UPPER: String)
+
class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
val dirname = Utils.createTempDir()
@@ -55,6 +59,19 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft
Utils.deleteRecursively(dirname)
}
+ test("Case insensitive attribute names") {
+ val tempFile = File.createTempFile("parquet", "")
+ tempFile.delete()
+ sparkContext.parallelize(1 to 10)
+ .map(_.toString)
+ .map(i => Cases(i, i))
+ .saveAsParquetFile(tempFile.getCanonicalPath)
+
+ parquetFile(tempFile.getCanonicalPath).registerAsTable("cases")
+ hql("SELECT upper FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString)
+ hql("SELECT LOWER FROM cases").collect().map(_.getString(0)) === (1 to 10).map(_.toString)
+ }
+
test("SELECT on Parquet table") {
val rdd = hql("SELECT * FROM testsource").collect()
assert(rdd != null)