You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/23 05:04:44 UTC
[spark] branch master updated: [SPARK-39543] The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e5b7fb85b2d [SPARK-39543] The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1
e5b7fb85b2d is described below
commit e5b7fb85b2d91f2e84dc60888c94e15b53751078
Author: Yikf <yi...@gmail.com>
AuthorDate: Thu Jun 23 13:04:05 2022 +0800
[SPARK-39543] The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1
### What changes were proposed in this pull request?
The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1, to support something such as compressed formats
### Why are the changes needed?
example:
`spark.range(0, 100).writeTo("t1").option("compression", "zstd").using("parquet").create`
**before**
gen: part-00000-644a65ed-0e7a-43d5-8d30-b610a0fb19dc-c000.**snappy**.parquet ...
**after**
gen: part-00000-6eb9d1ae-8fdb-4428-aea3-bd6553954cdd-c000.**zstd**.parquet ...
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
new test
Closes #36941 from Yikf/writeV2option.
Authored-by: Yikf <yi...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../catalyst/analysis/ResolveSessionCatalog.scala | 8 ++++++--
.../apache/spark/sql/DataFrameWriterV2Suite.scala | 20 ++++++++++++++++++++
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index f4df3bea532..41b0599848e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -161,11 +161,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
c
}
- case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, _, _)
+ case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, writeOptions, _)
if isSessionCatalog(catalog) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
- c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
+ c.tableSpec.provider,
+ c.tableSpec.options ++ writeOptions,
+ c.tableSpec.location,
+ c.tableSpec.serde,
ctas = true)
+
if (!isV2Provider(provider)) {
constructV1TableCmd(Some(c.query), c.tableSpec, name, new StructType, c.partitioning,
c.ignoreIfExists, storageFormat, provider)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
index 8aef27a1b66..86108a81da8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
@@ -23,12 +23,15 @@ import scala.collection.JavaConverters._
import org.scalatest.BeforeAndAfter
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
+import org.apache.spark.sql.connector.InMemoryV1Provider
import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable, InMemoryTableCatalog, TableCatalog}
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.FakeSourceOne
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
@@ -531,6 +534,23 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
}
+ test("SPARK-39543 writeOption should be passed to storage properties when fallback to v1") {
+ val provider = classOf[InMemoryV1Provider].getName
+
+ withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, provider)) {
+ spark.range(10)
+ .writeTo("table_name")
+ .option("compression", "zstd").option("name", "table_name")
+ .using(provider)
+ .create()
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("table_name"))
+
+ assert(table.identifier === TableIdentifier("table_name", Some("default")))
+ assert(table.storage.properties.contains("compression"))
+ assert(table.storage.properties.getOrElse("compression", "foo") == "zstd")
+ }
+ }
+
test("Replace: basic behavior") {
spark.sql(
"CREATE TABLE testcat.table_name (id bigint, data string) USING foo PARTITIONED BY (id)")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org