You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2019/09/18 16:28:18 UTC
[spark] branch master updated: [SPARK-29030][SQL] Simplify
lookupV2Relation
This is an automated email from the ASF dual-hosted git repository.
brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ee94b5d [SPARK-29030][SQL] Simplify lookupV2Relation
ee94b5d is described below
commit ee94b5d7019f8ec181d42e953cb8b5190186fe30
Author: John Zhuge <jz...@apache.org>
AuthorDate: Wed Sep 18 09:27:11 2019 -0700
[SPARK-29030][SQL] Simplify lookupV2Relation
## What changes were proposed in this pull request?
Simplify the return type for `lookupV2Relation` which makes the 3 callers more straightforward.
## How was this patch tested?
Existing unit tests.
Closes #25735 from jzhuge/lookupv2relation.
Authored-by: John Zhuge <jz...@apache.org>
Signed-off-by: Burak Yavuz <br...@gmail.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 87 ++++++++--------------
.../sql/connector/catalog/CatalogV2Implicits.scala | 6 +-
2 files changed, 37 insertions(+), 56 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0a13a34..76e59fa 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.sql._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.toPrettySQL
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, Table, TableCatalog, TableChange, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, Identifier, LookupCatalog, Table, TableCatalog, TableChange, V1Table}
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
@@ -666,20 +666,13 @@ class Analyzer(
object ResolveTables extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u: UnresolvedRelation =>
- val v2TableOpt = lookupV2Relation(u.multipartIdentifier) match {
- case scala.Left((_, _, tableOpt)) => tableOpt
- case scala.Right(tableOpt) => tableOpt
- }
- v2TableOpt.map(DataSourceV2Relation.create).getOrElse(u)
+ lookupV2Relation(u.multipartIdentifier)
+ .getOrElse(u)
case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved =>
- val v2TableOpt = lookupV2Relation(u.multipartIdentifier) match {
- case scala.Left((_, _, tableOpt)) => tableOpt
- case scala.Right(tableOpt) => tableOpt
- }
- v2TableOpt.map(DataSourceV2Relation.create).map { v2Relation =>
- i.copy(table = v2Relation)
- }.getOrElse(i)
+ lookupV2Relation(u.multipartIdentifier)
+ .map(v2Relation => i.copy(table = v2Relation))
+ .getOrElse(i)
}
}
@@ -963,26 +956,13 @@ class Analyzer(
private def resolveV2Alter(
tableName: Seq[String],
changes: Seq[TableChange]): Option[AlterTable] = {
- lookupV2Relation(tableName) match {
- case scala.Left((v2Catalog, ident, tableOpt)) =>
- Some(AlterTable(
- v2Catalog.asTableCatalog,
- ident,
- tableOpt.map(DataSourceV2Relation.create).getOrElse(UnresolvedRelation(tableName)),
- changes
- ))
- case scala.Right(tableOpt) =>
- tableOpt.map { table =>
- AlterTable(
- sessionCatalog.asTableCatalog,
- Identifier.of(tableName.init.toArray, tableName.last),
- DataSourceV2Relation.create(table),
- changes
- )
- }
+ lookupV2RelationAndCatalog(tableName).map {
+ case (relation, catalog, ident) =>
+ AlterTable(catalog.asTableCatalog, ident, relation, changes)
}
}
}
+
/**
* Resolve DESCRIBE TABLE statements that use a DSv2 catalog.
*
@@ -2840,36 +2820,35 @@ class Analyzer(
/**
* Performs the lookup of DataSourceV2 Tables. The order of resolution is:
- * 1. Check if this relation is a temporary table
- * 2. Check if it has a catalog identifier. Here we try to load the table. If we find the table,
- * we can return the table. The result returned by an explicit catalog will be returned on
- * the Left projection of the Either.
- * 3. Try resolving the relation using the V2SessionCatalog if that is defined. If the
- * V2SessionCatalog returns a V1 table definition (UnresolvedTable), then we return a `None`
- * on the right side so that we can fallback to the V1 code paths.
- * The basic idea is, if a value is returned on the Left, it means a v2 catalog is defined and
- * must be used to resolve the table. If a value is returned on the right, then we can try
- * creating a V2 relation if a V2 Table is defined. If it isn't defined, then we should defer
- * to V1 code paths.
+ * 1. Check if this relation is a temporary table.
+ * 2. Check if it has a catalog identifier. Here we try to load the table.
+ * If we find the table, return the v2 relation and catalog.
+ * 3. Try resolving the relation using the V2SessionCatalog if that is defined.
+ * If the V2SessionCatalog returns a V1 table definition,
+ * return `None` so that we can fallback to the V1 code paths.
+ * If the V2SessionCatalog returns a V2 table, return the v2 relation and V2SessionCatalog.
*/
- private def lookupV2Relation(
- identifier: Seq[String]
- ): Either[(CatalogPlugin, Identifier, Option[Table]), Option[Table]] = {
- import org.apache.spark.sql.connector.catalog.CatalogV2Util._
-
+ private def lookupV2RelationAndCatalog(
+ identifier: Seq[String]): Option[(DataSourceV2Relation, CatalogPlugin, Identifier)] =
identifier match {
- case AsTemporaryViewIdentifier(ti) if catalog.isTemporaryTable(ti) =>
- scala.Right(None)
+ case AsTemporaryViewIdentifier(ti) if catalog.isTemporaryTable(ti) => None
case CatalogObjectIdentifier(Some(v2Catalog), ident) =>
- scala.Left((v2Catalog, ident, loadTable(v2Catalog, ident)))
+ CatalogV2Util.loadTable(v2Catalog, ident) match {
+ case Some(table) => Some((DataSourceV2Relation.create(table), v2Catalog, ident))
+ case None => None
+ }
case CatalogObjectIdentifier(None, ident) =>
- loadTable(catalogManager.v2SessionCatalog, ident) match {
- case Some(_: V1Table) => scala.Right(None)
- case other => scala.Right(other)
+ CatalogV2Util.loadTable(catalogManager.v2SessionCatalog, ident) match {
+ case Some(_: V1Table) => None
+ case Some(table) =>
+ Some((DataSourceV2Relation.create(table), catalogManager.v2SessionCatalog, ident))
+ case None => None
}
- case _ => scala.Right(None)
+ case _ => None
}
- }
+
+ private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
+ lookupV2RelationAndCatalog(identifier).map(_._1)
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index 7d0acce..031e058 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -92,8 +92,10 @@ private[sql] object CatalogV2Implicits {
}
}
- implicit class MultipartIdentifierHelper(namespace: Seq[String]) {
- def quoted: String = namespace.map(quote).mkString(".")
+ implicit class MultipartIdentifierHelper(parts: Seq[String]) {
+ def quoted: String = parts.map(quote).mkString(".")
+
+ def asIdentifier: Identifier = Identifier.of(parts.init.toArray, parts.last)
}
private def quote(part: String): String = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org