You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/03/30 07:02:56 UTC

git commit: [SQL] SPARK-1354 Fix self-joins of parquet relations

Repository: spark
Updated Branches:
  refs/heads/master 92b83959c -> 2861b07bb


[SQL] SPARK-1354 Fix self-joins of parquet relations

@AndreSchumacher, please take a look.

https://spark-project.atlassian.net/browse/SPARK-1354

Author: Michael Armbrust <mi...@databricks.com>

Closes #269 from marmbrus/parquetJoin and squashes the following commits:

4081e77 [Michael Armbrust] Create new instances of Parquet relation when multiple copies are in a single plan.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2861b07b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2861b07b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2861b07b

Branch: refs/heads/master
Commit: 2861b07bb030f72769f5b757b4a7d4a635807140
Parents: 92b8395
Author: Michael Armbrust <mi...@databricks.com>
Authored: Sat Mar 29 22:02:53 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Mar 29 22:02:53 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/parquet/ParquetRelation.scala      | 15 +++++++++++++--
 .../spark/sql/parquet/ParquetQuerySuite.scala    | 19 +++++++++++++++++++
 2 files changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2861b07b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 2b825f8..67a34e1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -36,7 +36,7 @@ import parquet.schema.{MessageType, MessageTypeParser}
 import parquet.schema.{PrimitiveType => ParquetPrimitiveType}
 import parquet.schema.{Type => ParquetType}
 
-import org.apache.spark.sql.catalyst.analysis.UnresolvedException
+import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, UnresolvedException}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
 import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.types._
@@ -54,7 +54,8 @@ import org.apache.spark.sql.catalyst.types._
  * @param tableName The name of the relation that can be used in queries.
  * @param path The path to the Parquet file.
  */
-case class ParquetRelation(tableName: String, path: String) extends BaseRelation {
+case class ParquetRelation(tableName: String, path: String)
+  extends BaseRelation with MultiInstanceRelation {
 
   /** Schema derived from ParquetFile **/
   def parquetSchema: MessageType =
@@ -74,6 +75,16 @@ case class ParquetRelation(tableName: String, path: String) extends BaseRelation
   // Parquet files have no concepts of keys, therefore no Partitioner
   // Note: we could allow Block level access; needs to be thought through
   override def isPartitioned = false
+
+  override def newInstance = ParquetRelation(tableName, path).asInstanceOf[this.type]
+
+  // Equals must also take into account the output attributes so that we can distinguish between
+  // different instances of the same relation,
+  override def equals(other: Any) = other match {
+    case p: ParquetRelation =>
+      p.tableName == tableName && p.path == path && p.output == output
+    case _ => false
+  }
 }
 
 object ParquetRelation {

http://git-wip-us.apache.org/repos/asf/spark/blob/2861b07b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 71caa70..ea1733b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -30,6 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.catalyst.util.getTempFilePath
 import org.apache.spark.sql.test.TestSQLContext
 
+// Implicits
+import org.apache.spark.sql.test.TestSQLContext._
+
 class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
   override def beforeAll() {
     ParquetTestData.writeFile()
@@ -39,6 +42,22 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
     ParquetTestData.testFile.delete()
   }
 
+  test("self-join parquet files") {
+    val x = ParquetTestData.testData.subquery('x)
+    val y = ParquetTestData.testData.subquery('y)
+    val query = x.join(y).where("x.myint".attr === "y.myint".attr)
+
+    // Check to make sure that the attributes from either side of the join have unique expression
+    // ids.
+    query.queryExecution.analyzed.output.filter(_.name == "myint") match {
+      case Seq(i1, i2) if(i1.exprId == i2.exprId) =>
+        fail(s"Duplicate expression IDs found in query plan: $query")
+      case Seq(_, _) => // All good
+    }
+
+    // TODO: We can't run this query as it NPEs
+  }
+
   test("Import of simple Parquet file") {
     val result = getRDD(ParquetTestData.testData).collect()
     assert(result.size === 15)