You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2019/09/18 16:28:18 UTC

[spark] branch master updated: [SPARK-29030][SQL] Simplify lookupV2Relation

This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ee94b5d  [SPARK-29030][SQL] Simplify lookupV2Relation
ee94b5d is described below

commit ee94b5d7019f8ec181d42e953cb8b5190186fe30
Author: John Zhuge <jz...@apache.org>
AuthorDate: Wed Sep 18 09:27:11 2019 -0700

    [SPARK-29030][SQL] Simplify lookupV2Relation
    
    ## What changes were proposed in this pull request?
    
    Simplify the return type for `lookupV2Relation` which makes the 3 callers more straightforward.
    
    ## How was this patch tested?
    
    Existing unit tests.
    
    Closes #25735 from jzhuge/lookupv2relation.
    
    Authored-by: John Zhuge <jz...@apache.org>
    Signed-off-by: Burak Yavuz <br...@gmail.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 87 ++++++++--------------
 .../sql/connector/catalog/CatalogV2Implicits.scala |  6 +-
 2 files changed, 37 insertions(+), 56 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0a13a34..76e59fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.sql._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
 import org.apache.spark.sql.catalyst.util.toPrettySQL
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, Table, TableCatalog, TableChange, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, Table, TableCatalog, TableChange, V1Table}
 import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.SQLConf
@@ -666,20 +666,13 @@ class Analyzer(
   object ResolveTables extends Rule[LogicalPlan] {
     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
       case u: UnresolvedRelation =>
-        val v2TableOpt = lookupV2Relation(u.multipartIdentifier) match {
-          case scala.Left((_, _, tableOpt)) => tableOpt
-          case scala.Right(tableOpt) => tableOpt
-        }
-        v2TableOpt.map(DataSourceV2Relation.create).getOrElse(u)
+        lookupV2Relation(u.multipartIdentifier)
+          .getOrElse(u)
 
       case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved =>
-        val v2TableOpt = lookupV2Relation(u.multipartIdentifier) match {
-          case scala.Left((_, _, tableOpt)) => tableOpt
-          case scala.Right(tableOpt) => tableOpt
-        }
-        v2TableOpt.map(DataSourceV2Relation.create).map { v2Relation =>
-          i.copy(table = v2Relation)
-        }.getOrElse(i)
+        lookupV2Relation(u.multipartIdentifier)
+          .map(v2Relation => i.copy(table = v2Relation))
+          .getOrElse(i)
     }
   }
 
@@ -963,26 +956,13 @@ class Analyzer(
     private def resolveV2Alter(
         tableName: Seq[String],
         changes: Seq[TableChange]): Option[AlterTable] = {
-      lookupV2Relation(tableName) match {
-        case scala.Left((v2Catalog, ident, tableOpt)) =>
-          Some(AlterTable(
-            v2Catalog.asTableCatalog,
-            ident,
-            tableOpt.map(DataSourceV2Relation.create).getOrElse(UnresolvedRelation(tableName)),
-            changes
-          ))
-        case scala.Right(tableOpt) =>
-          tableOpt.map { table =>
-            AlterTable(
-              sessionCatalog.asTableCatalog,
-              Identifier.of(tableName.init.toArray, tableName.last),
-              DataSourceV2Relation.create(table),
-              changes
-            )
-          }
+      lookupV2RelationAndCatalog(tableName).map {
+        case (relation, catalog, ident) =>
+          AlterTable(catalog.asTableCatalog, ident, relation, changes)
       }
     }
   }
+
   /**
    * Resolve DESCRIBE TABLE statements that use a DSv2 catalog.
    *
@@ -2840,36 +2820,35 @@ class Analyzer(
 
   /**
    * Performs the lookup of DataSourceV2 Tables. The order of resolution is:
-   *   1. Check if this relation is a temporary table
-   *   2. Check if it has a catalog identifier. Here we try to load the table. If we find the table,
-   *      we can return the table. The result returned by an explicit catalog will be returned on
-   *      the Left projection of the Either.
-   *   3. Try resolving the relation using the V2SessionCatalog if that is defined. If the
-   *      V2SessionCatalog returns a V1 table definition (UnresolvedTable), then we return a `None`
-   *      on the right side so that we can fallback to the V1 code paths.
-   * The basic idea is, if a value is returned on the Left, it means a v2 catalog is defined and
-   * must be used to resolve the table. If a value is returned on the right, then we can try
-   * creating a V2 relation if a V2 Table is defined. If it isn't defined, then we should defer
-   * to V1 code paths.
+   *   1. Check if this relation is a temporary table.
+   *   2. Check if it has a catalog identifier. Here we try to load the table.
+   *      If we find the table, return the v2 relation and catalog.
+   *   3. Try resolving the relation using the V2SessionCatalog if that is defined.
+   *      If the V2SessionCatalog returns a V1 table definition,
+   *      return `None` so that we can fallback to the V1 code paths.
+   *      If the V2SessionCatalog returns a V2 table, return the v2 relation and V2SessionCatalog.
    */
-  private def lookupV2Relation(
-      identifier: Seq[String]
-      ): Either[(CatalogPlugin, Identifier, Option[Table]), Option[Table]] = {
-    import org.apache.spark.sql.connector.catalog.CatalogV2Util._
-
+  private def lookupV2RelationAndCatalog(
+      identifier: Seq[String]): Option[(DataSourceV2Relation, CatalogPlugin, Identifier)] =
     identifier match {
-      case AsTemporaryViewIdentifier(ti) if catalog.isTemporaryTable(ti) =>
-        scala.Right(None)
+      case AsTemporaryViewIdentifier(ti) if catalog.isTemporaryTable(ti) => None
       case CatalogObjectIdentifier(Some(v2Catalog), ident) =>
-        scala.Left((v2Catalog, ident, loadTable(v2Catalog, ident)))
+        CatalogV2Util.loadTable(v2Catalog, ident) match {
+          case Some(table) => Some((DataSourceV2Relation.create(table), v2Catalog, ident))
+          case None => None
+        }
       case CatalogObjectIdentifier(None, ident) =>
-        loadTable(catalogManager.v2SessionCatalog, ident) match {
-          case Some(_: V1Table) => scala.Right(None)
-          case other => scala.Right(other)
+        CatalogV2Util.loadTable(catalogManager.v2SessionCatalog, ident) match {
+          case Some(_: V1Table) => None
+          case Some(table) =>
+            Some((DataSourceV2Relation.create(table), catalogManager.v2SessionCatalog, ident))
+          case None => None
         }
-      case _ => scala.Right(None)
+      case _ => None
     }
-  }
+
+  private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
+    lookupV2RelationAndCatalog(identifier).map(_._1)
 }
 
 /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index 7d0acce..031e058 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -92,8 +92,10 @@ private[sql] object CatalogV2Implicits {
     }
   }
 
-  implicit class MultipartIdentifierHelper(namespace: Seq[String]) {
-    def quoted: String = namespace.map(quote).mkString(".")
+  implicit class MultipartIdentifierHelper(parts: Seq[String]) {
+    def quoted: String = parts.map(quote).mkString(".")
+
+    def asIdentifier: Identifier = Identifier.of(parts.init.toArray, parts.last)
   }
 
   private def quote(part: String): String = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org