You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/31 15:35:12 UTC

[spark] branch branch-3.0 updated: [SPARK-31230][SQL] Use statement plans in DataFrameWriter(V2)

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 20bb334  [SPARK-31230][SQL] Use statement plans in DataFrameWriter(V2)
20bb334 is described below

commit 20bb33453f85aeb5d2448252a9dd23d3ab85d251
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Mar 31 23:19:46 2020 +0800

    [SPARK-31230][SQL] Use statement plans in DataFrameWriter(V2)
    
    ### What changes were proposed in this pull request?
    
    Create statement plans in `DataFrameWriter(V2)`, like the SQL API.
    
    ### Why are the changes needed?
    
    It's better to leave all the resolution work to the analyzer.
    
    ### Does this PR introduce any user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #27992 from cloud-fan/statement.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 8b01473e8bffe349b1ed993b61420d7d68896cd8)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/analysis/ResolveCatalogs.scala    |  8 ++--
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  4 +-
 .../sql/catalyst/plans/logical/statements.scala    |  2 +
 .../apache/spark/sql/connector/InMemoryTable.scala |  1 +
 .../org/apache/spark/sql/DataFrameWriter.scala     | 55 ++++++++++++----------
 .../org/apache/spark/sql/DataFrameWriterV2.scala   | 43 ++++++++---------
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  8 ++--
 .../execution/command/PlanResolutionSuite.scala    |  4 +-
 8 files changed, 66 insertions(+), 59 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 895dfbb..403e4e8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -134,7 +134,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
         ignoreIfExists = c.ifNotExists)
 
     case c @ CreateTableAsSelectStatement(
-         NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+         NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
       CreateTableAsSelect(
         catalog.asTableCatalog,
         tbl.asIdentifier,
@@ -142,7 +142,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
         c.partitioning ++ c.bucketSpec.map(_.asTransform),
         c.asSelect,
         convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
-        writeOptions = c.options,
+        writeOptions = c.writeOptions,
         ignoreIfExists = c.ifNotExists)
 
     case RefreshTableStatement(NonSessionCatalogAndTable(catalog, tbl)) =>
@@ -161,7 +161,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
         orCreate = c.orCreate)
 
     case c @ ReplaceTableAsSelectStatement(
-         NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+         NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
       ReplaceTableAsSelect(
         catalog.asTableCatalog,
         tbl.asIdentifier,
@@ -169,7 +169,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
         c.partitioning ++ c.bucketSpec.map(_.asTransform),
         c.asSelect,
         convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
-        writeOptions = c.options,
+        writeOptions = c.writeOptions,
         orCreate = c.orCreate)
 
     case DropTableStatement(NonSessionCatalogAndTable(catalog, tbl), ifExists, _) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 09d316b6..cd4c895 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2779,7 +2779,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
       case Some(query) =>
         CreateTableAsSelectStatement(
           table, query, partitioning, bucketSpec, properties, provider, options, location, comment,
-          ifNotExists = ifNotExists)
+          writeOptions = Map.empty, ifNotExists = ifNotExists)
 
       case None if temp =>
         // CREATE TEMPORARY TABLE ... USING ... is not supported by the catalyst parser.
@@ -2834,7 +2834,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
 
       case Some(query) =>
         ReplaceTableAsSelectStatement(table, query, partitioning, bucketSpec, properties,
-          provider, options, location, comment, orCreate = orCreate)
+          provider, options, location, comment, writeOptions = Map.empty, orCreate = orCreate)
 
       case _ =>
         ReplaceTableStatement(table, schema.getOrElse(new StructType), partitioning,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 50c3cd0..7d7aae2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -83,6 +83,7 @@ case class CreateTableAsSelectStatement(
     options: Map[String, String],
     location: Option[String],
     comment: Option[String],
+    writeOptions: Map[String, String],
     ifNotExists: Boolean) extends ParsedStatement {
 
   override def children: Seq[LogicalPlan] = Seq(asSelect)
@@ -133,6 +134,7 @@ case class ReplaceTableAsSelectStatement(
     options: Map[String, String],
     location: Option[String],
     comment: Option[String],
+    writeOptions: Map[String, String],
     orCreate: Boolean) extends ParsedStatement {
 
   override def children: Seq[LogicalPlan] = Seq(asSelect)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index 0187ae3..3d7026e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -116,6 +116,7 @@ class InMemoryTable(
   }
 
   override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    InMemoryTable.maybeSimulateFailedTableWrite(new CaseInsensitiveStringMap(properties))
     InMemoryTable.maybeSimulateFailedTableWrite(info.options)
 
     new WriteBuilder with SupportsTruncate with SupportsOverwrite with SupportsDynamicOverwrite {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 7e669e0..d11e4db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -26,18 +26,17 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect}
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, SupportsWrite, Table, TableCatalog, TableProvider, V1Table}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement}
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table}
 import org.apache.spark.sql.connector.catalog.TableCapability._
-import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, LiteralValue, Transform}
+import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.v2._
 import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 /**
@@ -574,12 +573,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     val canUseV2 = lookupV2Provider().isDefined
 
     session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
-      case NonSessionCatalogAndIdentifier(catalog, ident) =>
-        saveAsTable(catalog.asTableCatalog, ident)
+      case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) =>
+        saveAsTable(catalog.asTableCatalog, ident, nameParts)
 
-      case SessionCatalogAndIdentifier(catalog, ident)
+      case nameParts @ SessionCatalogAndIdentifier(catalog, ident)
           if canUseV2 && ident.namespace().length <= 1 =>
-        saveAsTable(catalog.asTableCatalog, ident)
+        saveAsTable(catalog.asTableCatalog, ident, nameParts)
 
       case AsTableIdentifier(tableIdentifier) =>
         saveAsTable(tableIdentifier)
@@ -591,16 +590,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
   }
 
 
-  private def saveAsTable(catalog: TableCatalog, ident: Identifier): Unit = {
+  private def saveAsTable(
+      catalog: TableCatalog, ident: Identifier, nameParts: Seq[String]): Unit = {
     val tableOpt = try Option(catalog.loadTable(ident)) catch {
       case _: NoSuchTableException => None
     }
 
-    def getLocationIfExists: Option[(String, String)] = {
-      val opts = CaseInsensitiveMap(extraOptions.toMap)
-      opts.get("path").map(TableCatalog.PROP_LOCATION -> _)
-    }
-
     val command = (mode, tableOpt) match {
       case (_, Some(_: V1Table)) =>
         return saveAsTable(TableIdentifier(ident.name(), ident.namespace().headOption))
@@ -611,12 +606,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         AppendData.byName(v2Relation, df.logicalPlan, extraOptions.toMap)
 
       case (SaveMode.Overwrite, _) =>
-        ReplaceTableAsSelect(
-          catalog,
-          ident,
-          partitioningAsV2,
+        ReplaceTableAsSelectStatement(
+          nameParts,
           df.queryExecution.analyzed,
-          Map(TableCatalog.PROP_PROVIDER -> source) ++ getLocationIfExists,
+          partitioningAsV2,
+          None,
+          Map.empty,
+          Some(source),
+          Map.empty,
+          extraOptions.get("path"),
+          extraOptions.get(TableCatalog.PROP_COMMENT),
           extraOptions.toMap,
           orCreate = true)      // Create the table if it doesn't exist
 
@@ -624,14 +623,18 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         // We have a potential race condition here in AppendMode, if the table suddenly gets
         // created between our existence check and physical execution, but this can't be helped
         // in any case.
-        CreateTableAsSelect(
-          catalog,
-          ident,
-          partitioningAsV2,
+        CreateTableAsSelectStatement(
+          nameParts,
           df.queryExecution.analyzed,
-          Map(TableCatalog.PROP_PROVIDER -> source) ++ getLocationIfExists,
+          partitioningAsV2,
+          None,
+          Map.empty,
+          Some(source),
+          Map.empty,
+          extraOptions.get("path"),
+          extraOptions.get(TableCatalog.PROP_COMMENT),
           extraOptions.toMap,
-          ignoreIfExists = other == SaveMode.Ignore)
+          ifNotExists = other == SaveMode.Ignore)
     }
 
     runCommand(df.sparkSession, "saveAsTable") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index 45a9b28..15ff7fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -23,8 +23,7 @@ import scala.collection.mutable
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect}
-import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement}
 import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -47,8 +46,6 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
 
   private val sparkSession = ds.sparkSession
 
-  private val catalogManager = sparkSession.sessionState.analyzer.catalogManager
-
   private val tableName = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(table)
 
   private val (catalog, identifier) = {
@@ -120,19 +117,19 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
   }
 
   override def create(): Unit = {
-    // create and replace could alternatively create ParsedPlan statements, like
-    // `CreateTableFromDataFrameStatement(UnresolvedRelation(tableName), ...)`, to keep the catalog
-    // resolution logic in the analyzer.
     runCommand("create") {
-      CreateTableAsSelect(
-        catalog,
-        identifier,
-        partitioning.getOrElse(Seq.empty),
+      CreateTableAsSelectStatement(
+        tableName,
         logicalPlan,
-        properties = provider.map(p => properties + (TableCatalog.PROP_PROVIDER -> p))
-          .getOrElse(properties).toMap,
-        writeOptions = options.toMap,
-        ignoreIfExists = false)
+        partitioning.getOrElse(Seq.empty),
+        None,
+        properties.toMap,
+        provider,
+        Map.empty,
+        None,
+        None,
+        options.toMap,
+        ifNotExists = false)
     }
   }
 
@@ -231,13 +228,17 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
 
   private def internalReplace(orCreate: Boolean): Unit = {
     runCommand("replace") {
-      ReplaceTableAsSelect(
-        catalog,
-        identifier,
-        partitioning.getOrElse(Seq.empty),
+      ReplaceTableAsSelectStatement(
+        tableName,
         logicalPlan,
-        properties = provider.map(p => properties + ("provider" -> p)).getOrElse(properties).toMap,
-        writeOptions = options.toMap,
+        partitioning.getOrElse(Seq.empty),
+        None,
+        properties.toMap,
+        provider,
+        Map.empty,
+        None,
+        None,
+        options.toMap,
         orCreate = orCreate)
     }
   }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 7b88fc6..77c5701 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -267,7 +267,7 @@ class ResolveSessionCatalog(
       }
 
     case c @ CreateTableAsSelectStatement(
-         SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+         SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
       val provider = c.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
@@ -283,7 +283,7 @@ class ResolveSessionCatalog(
           c.partitioning ++ c.bucketSpec.map(_.asTransform),
           c.asSelect,
           convertTableProperties(c.properties, c.options, c.location, c.comment, Some(provider)),
-          writeOptions = c.options,
+          writeOptions = c.writeOptions,
           ignoreIfExists = c.ifNotExists)
       }
 
@@ -311,7 +311,7 @@ class ResolveSessionCatalog(
       }
 
     case c @ ReplaceTableAsSelectStatement(
-         SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+         SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
       val provider = c.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
         throw new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2 tables.")
@@ -323,7 +323,7 @@ class ResolveSessionCatalog(
           c.partitioning ++ c.bucketSpec.map(_.asTransform),
           c.asSelect,
           convertTableProperties(c.properties, c.options, c.location, c.comment, Some(provider)),
-          writeOptions = c.options,
+          writeOptions = c.writeOptions,
           orCreate = c.orCreate)
       }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 2e88ea9..8a3e1bb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -552,7 +552,7 @@ class PlanResolutionSuite extends AnalysisTest {
         assert(ctas.catalog.name == "testcat")
         assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name"))
         assert(ctas.properties == expectedProperties)
-        assert(ctas.writeOptions == Map("other" -> "20"))
+        assert(ctas.writeOptions.isEmpty)
         assert(ctas.partitioning.isEmpty)
         assert(ctas.ignoreIfExists)
 
@@ -586,7 +586,7 @@ class PlanResolutionSuite extends AnalysisTest {
         assert(ctas.catalog.name == "testcat")
         assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name"))
         assert(ctas.properties == expectedProperties)
-        assert(ctas.writeOptions == Map("other" -> "20"))
+        assert(ctas.writeOptions.isEmpty)
         assert(ctas.partitioning.isEmpty)
         assert(ctas.ignoreIfExists)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org