You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2016/01/12 23:19:57 UTC

spark git commit: [SPARK-12724] SQL generation support for persisted data source tables

Repository: spark
Updated Branches:
  refs/heads/master 0d543b98f -> 8ed5f12d2


[SPARK-12724] SQL generation support for persisted data source tables

This PR implements SQL generation support for persisted data source tables.  A new field `metastoreTableIdentifier: Option[TableIdentifier]` is added to `LogicalRelation`.  When a `LogicalRelation` representing a persisted data source relation is created, this field holds the database name and table name of the relation.

Author: Cheng Lian <li...@databricks.com>

Closes #10712 from liancheng/spark-12724-datasources-sql-gen.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ed5f12d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ed5f12d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ed5f12d

Branch: refs/heads/master
Commit: 8ed5f12d2bb408bd37e4156b5f1bad9a6b8c3cb5
Parents: 0d543b9
Author: Cheng Lian <li...@databricks.com>
Authored: Tue Jan 12 14:19:53 2016 -0800
Committer: Cheng Lian <li...@databricks.com>
Committed: Tue Jan 12 14:19:53 2016 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/DataFrame.scala |  2 +-
 .../execution/datasources/DataSourceStrategy.scala  | 16 ++++++++--------
 .../sql/execution/datasources/LogicalRelation.scala |  8 +++++---
 .../datasources/parquet/ParquetRelation.scala       | 10 ++--------
 .../spark/sql/execution/datasources/rules.scala     | 16 ++++++++--------
 .../datasources/parquet/ParquetFilterSuite.scala    |  2 +-
 .../parquet/ParquetPartitionDiscoverySuite.scala    |  2 +-
 .../spark/sql/sources/FilteredScanSuite.scala       |  2 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala       |  6 ++++--
 .../org/apache/spark/sql/hive/SQLBuilder.scala      | 14 +++++---------
 .../apache/spark/sql/hive/execution/commands.scala  |  2 +-
 .../spark/sql/hive/LogicalPlanToSQLSuite.scala      | 10 ++++++++++
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala  |  2 +-
 .../spark/sql/hive/execution/SQLQuerySuite.scala    |  2 +-
 .../apache/spark/sql/hive/orc/OrcFilterSuite.scala  |  2 +-
 .../org/apache/spark/sql/hive/parquetSuites.scala   |  8 ++++----
 .../spark/sql/sources/hadoopFsRelationSuites.scala  |  2 +-
 17 files changed, 55 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 60d2f05..91bf2f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1728,7 +1728,7 @@ class DataFrame private[sql](
    */
   def inputFiles: Array[String] = {
     val files: Seq[String] = logicalPlan.collect {
-      case LogicalRelation(fsBasedRelation: FileRelation, _) =>
+      case LogicalRelation(fsBasedRelation: FileRelation, _, _) =>
         fsBasedRelation.inputFiles
       case fr: FileRelation =>
         fr.inputFiles

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 1d6290e..da9320f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -41,7 +41,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
  */
 private[sql] object DataSourceStrategy extends Strategy with Logging {
   def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match {
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _)) =>
       pruneFilterProjectRaw(
         l,
         projects,
@@ -49,14 +49,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         (requestedColumns, allPredicates, _) =>
           toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil
 
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _, _)) =>
       pruneFilterProject(
         l,
         projects,
         filters,
         (a, f) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f))) :: Nil
 
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedScan, _, _)) =>
       pruneFilterProject(
         l,
         projects,
@@ -64,7 +64,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         (a, _) => toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray))) :: Nil
 
     // Scanning partitioned HadoopFsRelation
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _))
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _))
         if t.partitionSpec.partitionColumns.nonEmpty =>
       // We divide the filter expressions into 3 parts
       val partitionColumns = AttributeSet(
@@ -118,7 +118,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       ).getOrElse(scan) :: Nil
 
     // Scanning non-partitioned HadoopFsRelation
-    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) =>
+    case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _, _)) =>
       // See buildPartitionedTableScan for the reason that we need to create a shard
       // broadcast HadoopConf.
       val sharedHadoopConf = SparkHadoopUtil.get.conf
@@ -130,16 +130,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
         filters,
         (a, f) => t.buildInternalScan(a.map(_.name).toArray, f, t.paths, confBroadcast)) :: Nil
 
-    case l @ LogicalRelation(baseRelation: TableScan, _) =>
+    case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
       execution.PhysicalRDD.createFromDataSource(
         l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
 
-    case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _),
+    case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
       part, query, overwrite, false) if part.isEmpty =>
       execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
 
     case i @ logical.InsertIntoTable(
-      l @ LogicalRelation(t: HadoopFsRelation, _), part, query, overwrite, false) =>
+      l @ LogicalRelation(t: HadoopFsRelation, _, _), part, query, overwrite, false) =>
       val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
       execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 219dae8..fa97f3d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.datasources
 
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
@@ -30,7 +31,8 @@ import org.apache.spark.sql.sources.BaseRelation
  */
 case class LogicalRelation(
     relation: BaseRelation,
-    expectedOutputAttributes: Option[Seq[Attribute]] = None)
+    expectedOutputAttributes: Option[Seq[Attribute]] = None,
+    metastoreTableIdentifier: Option[TableIdentifier] = None)
   extends LeafNode with MultiInstanceRelation {
 
   override val output: Seq[AttributeReference] = {
@@ -49,7 +51,7 @@ case class LogicalRelation(
 
   // Logical Relations are distinct if they have different output for the sake of transformations.
   override def equals(other: Any): Boolean = other match {
-    case l @ LogicalRelation(otherRelation, _) => relation == otherRelation && output == l.output
+    case l @ LogicalRelation(otherRelation, _, _) => relation == otherRelation && output == l.output
     case _ => false
   }
 
@@ -58,7 +60,7 @@ case class LogicalRelation(
   }
 
   override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match {
-    case LogicalRelation(otherRelation, _) => relation == otherRelation
+    case LogicalRelation(otherRelation, _, _) => relation == otherRelation
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 7754edc..991a5d5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -44,9 +44,9 @@ import org.apache.spark.{Logging, Partition => SparkPartition, SparkException}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.{RDD, SqlNewHadoopPartition, SqlNewHadoopRDD}
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier}
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.util.LegacyTypeStringParser
-import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.{PartitionSpec, _}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
@@ -147,12 +147,6 @@ private[sql] class ParquetRelation(
     .get(ParquetRelation.METASTORE_SCHEMA)
     .map(DataType.fromJson(_).asInstanceOf[StructType])
 
-  // If this relation is converted from a Hive metastore table, this method returns the name of the
-  // original Hive metastore table.
-  private[sql] def metastoreTableName: Option[TableIdentifier] = {
-    parameters.get(ParquetRelation.METASTORE_TABLE_NAME).map(SqlParser.parseTableIdentifier)
-  }
-
   private lazy val metadataCache: MetadataCache = {
     val meta = new MetadataCache
     meta.refresh()

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 1c773e6..dd3e66d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -61,7 +61,7 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
 
       // We are inserting into an InsertableRelation or HadoopFsRelation.
       case i @ InsertIntoTable(
-      l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _), _, child, _, _) => {
+      l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _), _, child, _, _) =>
         // First, make sure the data to be inserted have the same number of fields with the
         // schema of the relation.
         if (l.output.size != child.output.size) {
@@ -70,7 +70,6 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
               s"statement generates the same number of columns as its schema.")
         }
         castAndRenameChildOutput(i, l.output, child)
-      }
   }
 
   /** If necessary, cast data types and rename fields to the expected types and names. */
@@ -108,14 +107,15 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
       case i @ logical.InsertIntoTable(
-        l @ LogicalRelation(t: InsertableRelation, _), partition, query, overwrite, ifNotExists) =>
+        l @ LogicalRelation(t: InsertableRelation, _, _),
+        partition, query, overwrite, ifNotExists) =>
         // Right now, we do not support insert into a data source table with partition specs.
         if (partition.nonEmpty) {
           failAnalysis(s"Insert into a partition is not allowed because $l is not partitioned.")
         } else {
           // Get all input data source relations of the query.
           val srcRelations = query.collect {
-            case LogicalRelation(src: BaseRelation, _) => src
+            case LogicalRelation(src: BaseRelation, _, _) => src
           }
           if (srcRelations.contains(t)) {
             failAnalysis(
@@ -126,7 +126,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
         }
 
       case logical.InsertIntoTable(
-        LogicalRelation(r: HadoopFsRelation, _), part, query, overwrite, _) =>
+        LogicalRelation(r: HadoopFsRelation, _, _), part, query, overwrite, _) =>
         // We need to make sure the partition columns specified by users do match partition
         // columns of the relation.
         val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
@@ -145,7 +145,7 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
 
         // Get all input data source relations of the query.
         val srcRelations = query.collect {
-          case LogicalRelation(src: BaseRelation, _) => src
+          case LogicalRelation(src: BaseRelation, _, _) => src
         }
         if (srcRelations.contains(r)) {
           failAnalysis(
@@ -173,10 +173,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
           EliminateSubQueries(catalog.lookupRelation(c.tableIdent)) match {
             // Only do the check if the table is a data source table
             // (the relation is a BaseRelation).
-            case l @ LogicalRelation(dest: BaseRelation, _) =>
+            case l @ LogicalRelation(dest: BaseRelation, _, _) =>
               // Get all input data source relations of the query.
               val srcRelations = c.child.collect {
-                case LogicalRelation(src: BaseRelation, _) => src
+                case LogicalRelation(src: BaseRelation, _, _) => src
               }
               if (srcRelations.contains(dest)) {
                 failAnalysis(

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 587aa5f..97c5313 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -59,7 +59,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
 
         var maybeRelation: Option[ParquetRelation] = None
         val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
-          case PhysicalOperation(_, filters, LogicalRelation(relation: ParquetRelation, _)) =>
+          case PhysicalOperation(_, filters, LogicalRelation(relation: ParquetRelation, _, _)) =>
             maybeRelation = Some(relation)
             filters
         }.flatten.reduceLeftOption(_ && _)

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 0feb945..3d1677b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -563,7 +563,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
       (1 to 10).map(i => (i, i.toString)).toDF("a", "b").write.parquet(dir.getCanonicalPath)
       val queryExecution = sqlContext.read.parquet(dir.getCanonicalPath).queryExecution
       queryExecution.analyzed.collectFirst {
-        case LogicalRelation(relation: ParquetRelation, _) =>
+        case LogicalRelation(relation: ParquetRelation, _, _) =>
           assert(relation.partitionSpec === PartitionSpec.emptySpec)
       }.getOrElse {
         fail(s"Expecting a ParquetRelation2, but got:\n$queryExecution")

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
index 398b8a1..7196b6d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala
@@ -317,7 +317,7 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
 
       val table = caseInsensitiveContext.table("oneToTenFiltered")
       val relation = table.queryExecution.logical.collectFirst {
-        case LogicalRelation(r, _) => r
+        case LogicalRelation(r, _, _) => r
       }.get
 
       assert(

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 67228f3..daaa5a5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -184,7 +184,9 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
             table.properties("spark.sql.sources.provider"),
             options)
 
-        LogicalRelation(resolvedRelation.relation)
+        LogicalRelation(
+          resolvedRelation.relation,
+          metastoreTableIdentifier = Some(TableIdentifier(in.name, Some(in.database))))
       }
     }
 
@@ -447,7 +449,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
         partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
       cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => None // Cache miss
-        case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) =>
+        case logical @ LogicalRelation(parquetRelation: ParquetRelation, _, _) =>
           // If we have the same paths, same schema, and same partition spec,
           // we will use the cached Parquet Relation.
           val useCached =

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 61e3f18..e83b4bf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -19,14 +19,14 @@ package org.apache.spark.sql.hive
 
 import java.util.concurrent.atomic.AtomicLong
 
-import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.Logging
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder}
 import org.apache.spark.sql.catalyst.optimizer.ProjectCollapsing
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
 
 /**
  * A builder class used to convert a resolved logical plan into a SQL query string.  Note that this
@@ -135,13 +135,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
         rightSQL <- toSQL(right)
       } yield s"$leftSQL UNION ALL $rightSQL"
 
-    // ParquetRelation converted from Hive metastore table
-    case Subquery(alias, LogicalRelation(r: ParquetRelation, _)) =>
-      // There seems to be a bug related to `ParquetConversions` analysis rule.  The problem is
-      // that, the metastore database name and table name are not always propagated to converted
-      // `ParquetRelation` instances via data source options.  Here we use subquery alias as a
-      // workaround.
-      Some(s"`$alias`")
+    // Persisted data source relation
+    case Subquery(alias, LogicalRelation(_, _, Some(TableIdentifier(table, Some(database))))) =>
+      Some(s"`$database`.`$table`")
 
     case Subquery(alias, child) =>
       toSQL(child).map(childSQL => s"($childSQL) AS $alias")

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 612f01c..07a3528 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -216,7 +216,7 @@ case class CreateMetastoreDataSourceAsSelect(
             sqlContext, Some(query.schema.asNullable), partitionColumns, provider, optionsWithPath)
           val createdRelation = LogicalRelation(resolved.relation)
           EliminateSubQueries(sqlContext.catalog.lookupRelation(tableIdent)) match {
-            case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _) =>
+            case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
               if (l.relation != createdRelation.relation) {
                 val errorDescription =
                   s"Cannot append to table $tableName because the resolved relation does not " +

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index 2ee8150..0604d9f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -146,4 +146,14 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
   ignore("distinct and non-distinct aggregation") {
     checkHiveQl("SELECT a, COUNT(DISTINCT b), COUNT(DISTINCT c), SUM(d) FROM t2 GROUP BY a")
   }
+
+  test("persisted data source relations") {
+    Seq("orc", "json", "parquet").foreach { format =>
+      val tableName = s"${format}_t0"
+      withTable(tableName) {
+        sqlContext.range(10).write.format(format).saveAsTable(tableName)
+        checkHiveQl(s"SELECT id FROM $tableName")
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 202851a..253f13c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -571,7 +571,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
             Row(3) :: Row(4) :: Nil)
 
           table("test_parquet_ctas").queryExecution.optimizedPlan match {
-            case LogicalRelation(p: ParquetRelation, _) => // OK
+            case LogicalRelation(p: ParquetRelation, _, _) => // OK
             case _ =>
               fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation]}")
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 593fac2..f6c687a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -268,7 +268,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
     def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
       val relation = EliminateSubQueries(catalog.lookupRelation(TableIdentifier(tableName)))
       relation match {
-        case LogicalRelation(r: ParquetRelation, _) =>
+        case LogicalRelation(r: ParquetRelation, _, _) =>
           if (!isDataSourceParquet) {
             fail(
               s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
index 5afc7e7..c94e73c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcFilterSuite.scala
@@ -42,7 +42,7 @@ class OrcFilterSuite extends QueryTest with OrcTest {
 
     var maybeRelation: Option[OrcRelation] = None
     val maybeAnalyzedPredicate = query.queryExecution.optimizedPlan.collect {
-      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: OrcRelation, _)) =>
+      case PhysicalOperation(_, filters, LogicalRelation(orcRelation: OrcRelation, _, _)) =>
         maybeRelation = Some(orcRelation)
         filters
     }.flatten.reduceLeftOption(_ && _)

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 2ceb836..ed544c6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -282,7 +282,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       )
 
       table("test_parquet_ctas").queryExecution.optimizedPlan match {
-        case LogicalRelation(_: ParquetRelation, _) => // OK
+        case LogicalRelation(_: ParquetRelation, _, _) => // OK
         case _ => fail(
           "test_parquet_ctas should be converted to " +
               s"${classOf[ParquetRelation].getCanonicalName }")
@@ -369,7 +369,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
 
       assertResult(2) {
         analyzed.collect {
-          case r @ LogicalRelation(_: ParquetRelation, _) => r
+          case r @ LogicalRelation(_: ParquetRelation, _, _) => r
         }.size
       }
     }
@@ -378,7 +378,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
   def collectParquetRelation(df: DataFrame): ParquetRelation = {
     val plan = df.queryExecution.analyzed
     plan.collectFirst {
-      case LogicalRelation(r: ParquetRelation, _) => r
+      case LogicalRelation(r: ParquetRelation, _, _) => r
     }.getOrElse {
       fail(s"Expecting a ParquetRelation2, but got:\n$plan")
     }
@@ -428,7 +428,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
       // Converted test_parquet should be cached.
       catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
         case null => fail("Converted test_parquet should be cached in the cache.")
-        case logical @ LogicalRelation(parquetRelation: ParquetRelation, _) => // OK
+        case logical @ LogicalRelation(parquetRelation: ParquetRelation, _, _) => // OK
         case other =>
           fail(
             "The cached test_parquet should be a Parquet Relation. " +

http://git-wip-us.apache.org/repos/asf/spark/blob/8ed5f12d/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index efbf998..3f9ecf6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -500,7 +500,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
       }
 
       val actualPaths = df.queryExecution.analyzed.collectFirst {
-        case LogicalRelation(relation: HadoopFsRelation, _) =>
+        case LogicalRelation(relation: HadoopFsRelation, _, _) =>
           relation.paths.toSet
       }.getOrElse {
         fail("Expect an FSBasedRelation, but none could be found")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org