You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/23 05:04:44 UTC

[spark] branch master updated: [SPARK-39543] The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e5b7fb85b2d [SPARK-39543] The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1
e5b7fb85b2d is described below

commit e5b7fb85b2d91f2e84dc60888c94e15b53751078
Author: Yikf <yi...@gmail.com>
AuthorDate: Thu Jun 23 13:04:05 2022 +0800

    [SPARK-39543] The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1
    
    ### What changes were proposed in this pull request?
    
    The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1, to support something such as compressed formats
    
    ### Why are the changes needed?
    
    example:
    
    `spark.range(0, 100).writeTo("t1").option("compression", "zstd").using("parquet").create`
    
    **before**
    
    gen: part-00000-644a65ed-0e7a-43d5-8d30-b610a0fb19dc-c000.**snappy**.parquet ...
    
    **after**
    
    gen: part-00000-6eb9d1ae-8fdb-4428-aea3-bd6553954cdd-c000.**zstd**.parquet ...
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    new test
    
    Closes #36941 from Yikf/writeV2option.
    
    Authored-by: Yikf <yi...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../catalyst/analysis/ResolveSessionCatalog.scala    |  8 ++++++--
 .../apache/spark/sql/DataFrameWriterV2Suite.scala    | 20 ++++++++++++++++++++
 2 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index f4df3bea532..41b0599848e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -161,11 +161,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         c
       }
 
-    case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, _, _)
+    case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, writeOptions, _)
         if isSessionCatalog(catalog) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
-        c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
+        c.tableSpec.provider,
+        c.tableSpec.options ++ writeOptions,
+        c.tableSpec.location,
+        c.tableSpec.serde,
         ctas = true)
+
       if (!isV2Provider(provider)) {
         constructV1TableCmd(Some(c.query), c.tableSpec, name, new StructType, c.partitioning,
           c.ignoreIfExists, storageFormat, provider)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
index 8aef27a1b66..86108a81da8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
@@ -23,12 +23,15 @@ import scala.collection.JavaConverters._
 
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
+import org.apache.spark.sql.connector.InMemoryV1Provider
 import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable, InMemoryTableCatalog, TableCatalog}
 import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.FakeSourceOne
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
@@ -531,6 +534,23 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
     assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
   }
 
+  test("SPARK-39543 writeOption should be passed to storage properties when fallback to v1") {
+    val provider = classOf[InMemoryV1Provider].getName
+
+    withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, provider)) {
+      spark.range(10)
+        .writeTo("table_name")
+        .option("compression", "zstd").option("name", "table_name")
+        .using(provider)
+        .create()
+      val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("table_name"))
+
+      assert(table.identifier === TableIdentifier("table_name", Some("default")))
+      assert(table.storage.properties.contains("compression"))
+      assert(table.storage.properties.getOrElse("compression", "foo") == "zstd")
+    }
+  }
+
   test("Replace: basic behavior") {
     spark.sql(
       "CREATE TABLE testcat.table_name (id bigint, data string) USING foo PARTITIONED BY (id)")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org