You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/10/29 15:42:49 UTC

spark git commit: [SPARK-11246] [SQL] Table cache for Parquet broken in 1.5

Repository: spark
Updated Branches:
  refs/heads/master 3bb2a8d75 -> f7a51deeb


[SPARK-11246] [SQL] Table cache for Parquet broken in 1.5

The root cause is that when spark.sql.hive.convertMetastoreParquet=true by default, the cached InMemoryRelation of the ParquetRelation can not be looked up from the cachedData of CacheManager because the key comparison fails even though it is the same LogicalPlan representing the Subquery that wraps the ParquetRelation.
The solution in this PR is overriding the LogicalPlan.sameResult function in Subquery case class to eliminate subquery node first before directly comparing the child (ParquetRelation), which will find the key  to the cached InMemoryRelation.

Author: xin Wu <xi...@us.ibm.com>

Closes #9326 from xwu0226/spark-11246-commit.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7a51dee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7a51dee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7a51dee

Branch: refs/heads/master
Commit: f7a51deebad1b4c3b970a051f25d286110b94438
Parents: 3bb2a8d
Author: xin Wu <xi...@us.ibm.com>
Authored: Thu Oct 29 07:42:46 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Thu Oct 29 07:42:46 2015 -0700

----------------------------------------------------------------------
 .../sql/execution/datasources/LogicalRelation.scala      |  5 +++++
 .../org/apache/spark/sql/hive/CachedTableSuite.scala     | 11 +++++++++++
 2 files changed, 16 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f7a51dee/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 783252e..219dae8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -62,6 +62,11 @@ case class LogicalRelation(
     case _ => false
   }
 
+  // When comparing two LogicalRelations from within LogicalPlan.sameResult, we only need
+  // LogicalRelation.cleanArgs to return Seq(relation), since expectedOutputAttribute's
+  // expId can be different but the relation is still the same.
+  override lazy val cleanArgs: Seq[Any] = Seq(relation)
+
   @transient override lazy val statistics: Statistics = Statistics(
     sizeInBytes = BigInt(relation.sizeInBytes)
   )

http://git-wip-us.apache.org/repos/asf/spark/blob/f7a51dee/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 9adb378..5c2fc7d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import java.io.File
 
 import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
 import org.apache.spark.storage.RDDBlockId
@@ -203,4 +204,14 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
     sql("DROP TABLE refreshTable")
     Utils.deleteRecursively(tempPath)
   }
+
+  test("SPARK-11246 cache parquet table") {
+    sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1")
+
+    cacheTable("cachedTable")
+    val sparkPlan = sql("SELECT * FROM cachedTable").queryExecution.sparkPlan
+    assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 1)
+
+    sql("DROP TABLE cachedTable")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org