You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/03/31 15:21:04 UTC
[spark] branch master updated: [SPARK-31230][SQL] Use statement
plans in DataFrameWriter(V2)
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8b01473 [SPARK-31230][SQL] Use statement plans in DataFrameWriter(V2)
8b01473 is described below
commit 8b01473e8bffe349b1ed993b61420d7d68896cd8
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Mar 31 23:19:46 2020 +0800
[SPARK-31230][SQL] Use statement plans in DataFrameWriter(V2)
### What changes were proposed in this pull request?
Create statement plans in `DataFrameWriter(V2)`, like the SQL API.
### Why are the changes needed?
It's better to leave all the resolution work to the analyzer.
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
existing tests
Closes #27992 from cloud-fan/statement.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/catalyst/analysis/ResolveCatalogs.scala | 8 ++--
.../spark/sql/catalyst/parser/AstBuilder.scala | 4 +-
.../sql/catalyst/plans/logical/statements.scala | 2 +
.../apache/spark/sql/connector/InMemoryTable.scala | 1 +
.../org/apache/spark/sql/DataFrameWriter.scala | 55 ++++++++++++----------
.../org/apache/spark/sql/DataFrameWriterV2.scala | 43 ++++++++---------
.../catalyst/analysis/ResolveSessionCatalog.scala | 8 ++--
.../execution/command/PlanResolutionSuite.scala | 4 +-
8 files changed, 66 insertions(+), 59 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 463793e..2a0a944 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -156,7 +156,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
ignoreIfExists = c.ifNotExists)
case c @ CreateTableAsSelectStatement(
- NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
CreateTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
@@ -164,7 +164,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
- writeOptions = c.options,
+ writeOptions = c.writeOptions,
ignoreIfExists = c.ifNotExists)
case RefreshTableStatement(NonSessionCatalogAndTable(catalog, tbl)) =>
@@ -183,7 +183,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
orCreate = c.orCreate)
case c @ ReplaceTableAsSelectStatement(
- NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
ReplaceTableAsSelect(
catalog.asTableCatalog,
tbl.asIdentifier,
@@ -191,7 +191,7 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c.properties, c.options, c.location, c.comment, c.provider),
- writeOptions = c.options,
+ writeOptions = c.writeOptions,
orCreate = c.orCreate)
case DropTableStatement(NonSessionCatalogAndTable(catalog, tbl), ifExists, _) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 0f0ee80..cc41863 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2779,7 +2779,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
case Some(query) =>
CreateTableAsSelectStatement(
table, query, partitioning, bucketSpec, properties, provider, options, location, comment,
- ifNotExists = ifNotExists)
+ writeOptions = Map.empty, ifNotExists = ifNotExists)
case None if temp =>
// CREATE TEMPORARY TABLE ... USING ... is not supported by the catalyst parser.
@@ -2834,7 +2834,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
case Some(query) =>
ReplaceTableAsSelectStatement(table, query, partitioning, bucketSpec, properties,
- provider, options, location, comment, orCreate = orCreate)
+ provider, options, location, comment, writeOptions = Map.empty, orCreate = orCreate)
case _ =>
ReplaceTableStatement(table, schema.getOrElse(new StructType), partitioning,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index aa1e9cc..b1129e7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -83,6 +83,7 @@ case class CreateTableAsSelectStatement(
options: Map[String, String],
location: Option[String],
comment: Option[String],
+ writeOptions: Map[String, String],
ifNotExists: Boolean) extends ParsedStatement {
override def children: Seq[LogicalPlan] = Seq(asSelect)
@@ -133,6 +134,7 @@ case class ReplaceTableAsSelectStatement(
options: Map[String, String],
location: Option[String],
comment: Option[String],
+ writeOptions: Map[String, String],
orCreate: Boolean) extends ParsedStatement {
override def children: Seq[LogicalPlan] = Seq(asSelect)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index 0187ae3..3d7026e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -116,6 +116,7 @@ class InMemoryTable(
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ InMemoryTable.maybeSimulateFailedTableWrite(new CaseInsensitiveStringMap(properties))
InMemoryTable.maybeSimulateFailedTableWrite(info.options)
new WriteBuilder with SupportsTruncate with SupportsOverwrite with SupportsDynamicOverwrite {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 7e669e0..d11e4db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -26,18 +26,17 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect}
-import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, SupportsWrite, Table, TableCatalog, TableProvider, V1Table}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, CreateTableAsSelectStatement, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement}
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, CatalogV2Implicits, CatalogV2Util, Identifier, SupportsCatalogOptions, Table, TableCatalog, TableProvider, V1Table}
import org.apache.spark.sql.connector.catalog.TableCapability._
-import org.apache.spark.sql.connector.expressions.{BucketTransform, FieldReference, IdentityTransform, LiteralValue, Transform}
+import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.sources.BaseRelation
-import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
/**
@@ -574,12 +573,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val canUseV2 = lookupV2Provider().isDefined
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
- case NonSessionCatalogAndIdentifier(catalog, ident) =>
- saveAsTable(catalog.asTableCatalog, ident)
+ case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) =>
+ saveAsTable(catalog.asTableCatalog, ident, nameParts)
- case SessionCatalogAndIdentifier(catalog, ident)
+ case nameParts @ SessionCatalogAndIdentifier(catalog, ident)
if canUseV2 && ident.namespace().length <= 1 =>
- saveAsTable(catalog.asTableCatalog, ident)
+ saveAsTable(catalog.asTableCatalog, ident, nameParts)
case AsTableIdentifier(tableIdentifier) =>
saveAsTable(tableIdentifier)
@@ -591,16 +590,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
- private def saveAsTable(catalog: TableCatalog, ident: Identifier): Unit = {
+ private def saveAsTable(
+ catalog: TableCatalog, ident: Identifier, nameParts: Seq[String]): Unit = {
val tableOpt = try Option(catalog.loadTable(ident)) catch {
case _: NoSuchTableException => None
}
- def getLocationIfExists: Option[(String, String)] = {
- val opts = CaseInsensitiveMap(extraOptions.toMap)
- opts.get("path").map(TableCatalog.PROP_LOCATION -> _)
- }
-
val command = (mode, tableOpt) match {
case (_, Some(_: V1Table)) =>
return saveAsTable(TableIdentifier(ident.name(), ident.namespace().headOption))
@@ -611,12 +606,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
AppendData.byName(v2Relation, df.logicalPlan, extraOptions.toMap)
case (SaveMode.Overwrite, _) =>
- ReplaceTableAsSelect(
- catalog,
- ident,
- partitioningAsV2,
+ ReplaceTableAsSelectStatement(
+ nameParts,
df.queryExecution.analyzed,
- Map(TableCatalog.PROP_PROVIDER -> source) ++ getLocationIfExists,
+ partitioningAsV2,
+ None,
+ Map.empty,
+ Some(source),
+ Map.empty,
+ extraOptions.get("path"),
+ extraOptions.get(TableCatalog.PROP_COMMENT),
extraOptions.toMap,
orCreate = true) // Create the table if it doesn't exist
@@ -624,14 +623,18 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
// We have a potential race condition here in AppendMode, if the table suddenly gets
// created between our existence check and physical execution, but this can't be helped
// in any case.
- CreateTableAsSelect(
- catalog,
- ident,
- partitioningAsV2,
+ CreateTableAsSelectStatement(
+ nameParts,
df.queryExecution.analyzed,
- Map(TableCatalog.PROP_PROVIDER -> source) ++ getLocationIfExists,
+ partitioningAsV2,
+ None,
+ Map.empty,
+ Some(source),
+ Map.empty,
+ extraOptions.get("path"),
+ extraOptions.get(TableCatalog.PROP_COMMENT),
extraOptions.toMap,
- ignoreIfExists = other == SaveMode.Ignore)
+ ifNotExists = other == SaveMode.Ignore)
}
runCommand(df.sparkSession, "saveAsTable") {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index 45a9b28..15ff7fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -23,8 +23,7 @@ import scala.collection.mutable
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect}
-import org.apache.spark.sql.connector.catalog.TableCatalog
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelectStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelectStatement}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -47,8 +46,6 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
private val sparkSession = ds.sparkSession
- private val catalogManager = sparkSession.sessionState.analyzer.catalogManager
-
private val tableName = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(table)
private val (catalog, identifier) = {
@@ -120,19 +117,19 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
}
override def create(): Unit = {
- // create and replace could alternatively create ParsedPlan statements, like
- // `CreateTableFromDataFrameStatement(UnresolvedRelation(tableName), ...)`, to keep the catalog
- // resolution logic in the analyzer.
runCommand("create") {
- CreateTableAsSelect(
- catalog,
- identifier,
- partitioning.getOrElse(Seq.empty),
+ CreateTableAsSelectStatement(
+ tableName,
logicalPlan,
- properties = provider.map(p => properties + (TableCatalog.PROP_PROVIDER -> p))
- .getOrElse(properties).toMap,
- writeOptions = options.toMap,
- ignoreIfExists = false)
+ partitioning.getOrElse(Seq.empty),
+ None,
+ properties.toMap,
+ provider,
+ Map.empty,
+ None,
+ None,
+ options.toMap,
+ ifNotExists = false)
}
}
@@ -231,13 +228,17 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
private def internalReplace(orCreate: Boolean): Unit = {
runCommand("replace") {
- ReplaceTableAsSelect(
- catalog,
- identifier,
- partitioning.getOrElse(Seq.empty),
+ ReplaceTableAsSelectStatement(
+ tableName,
logicalPlan,
- properties = provider.map(p => properties + ("provider" -> p)).getOrElse(properties).toMap,
- writeOptions = options.toMap,
+ partitioning.getOrElse(Seq.empty),
+ None,
+ properties.toMap,
+ provider,
+ Map.empty,
+ None,
+ None,
+ options.toMap,
orCreate = orCreate)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index f891eca..476f155 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -291,7 +291,7 @@ class ResolveSessionCatalog(
}
case c @ CreateTableAsSelectStatement(
- SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
@@ -307,7 +307,7 @@ class ResolveSessionCatalog(
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c.properties, c.options, c.location, c.comment, Some(provider)),
- writeOptions = c.options,
+ writeOptions = c.writeOptions,
ignoreIfExists = c.ifNotExists)
}
@@ -335,7 +335,7 @@ class ResolveSessionCatalog(
}
case c @ ReplaceTableAsSelectStatement(
- SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _) =>
+ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _) =>
val provider = c.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw new AnalysisException("REPLACE TABLE AS SELECT is only supported with v2 tables.")
@@ -347,7 +347,7 @@ class ResolveSessionCatalog(
c.partitioning ++ c.bucketSpec.map(_.asTransform),
c.asSelect,
convertTableProperties(c.properties, c.options, c.location, c.comment, Some(provider)),
- writeOptions = c.options,
+ writeOptions = c.writeOptions,
orCreate = c.orCreate)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 22089b4..2d6a5da 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -552,7 +552,7 @@ class PlanResolutionSuite extends AnalysisTest {
assert(ctas.catalog.name == "testcat")
assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name"))
assert(ctas.properties == expectedProperties)
- assert(ctas.writeOptions == Map("other" -> "20"))
+ assert(ctas.writeOptions.isEmpty)
assert(ctas.partitioning.isEmpty)
assert(ctas.ignoreIfExists)
@@ -586,7 +586,7 @@ class PlanResolutionSuite extends AnalysisTest {
assert(ctas.catalog.name == "testcat")
assert(ctas.tableName == Identifier.of(Array("mydb"), "table_name"))
assert(ctas.properties == expectedProperties)
- assert(ctas.writeOptions == Map("other" -> "20"))
+ assert(ctas.writeOptions.isEmpty)
assert(ctas.partitioning.isEmpty)
assert(ctas.ignoreIfExists)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org