You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/23 05:07:50 UTC

[spark] branch branch-3.2 updated: [SPARK-39543] The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 725ce337cb1 [SPARK-39543] The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1
725ce337cb1 is described below

commit 725ce337cb1f24f666e99f7f0a3742333d116abb
Author: Yikf <yi...@gmail.com>
AuthorDate: Thu Jun 23 13:04:05 2022 +0800

    [SPARK-39543] The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1
    
    The option of DataFrameWriterV2 should be passed to storage properties if fallback to v1, to support something such as compressed formats
    
    example:
    
    `spark.range(0, 100).writeTo("t1").option("compression", "zstd").using("parquet").create`
    
    **before**
    
    gen: part-00000-644a65ed-0e7a-43d5-8d30-b610a0fb19dc-c000.**snappy**.parquet ...
    
    **after**
    
    gen: part-00000-6eb9d1ae-8fdb-4428-aea3-bd6553954cdd-c000.**zstd**.parquet ...
    
    No
    
    new test
    
    Closes #36941 from Yikf/writeV2option.
    
    Authored-by: Yikf <yi...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit e5b7fb85b2d91f2e84dc60888c94e15b53751078)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../catalyst/analysis/ResolveSessionCatalog.scala    |  9 +++++++--
 .../apache/spark/sql/DataFrameWriterV2Suite.scala    | 20 ++++++++++++++++++++
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index b73ccbbdb5e..6694d9b5843 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -180,9 +180,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
       }
 
     case c @ CreateTableAsSelectStatement(
-         SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _, _) =>
+         SessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, writeOptions, _, _, _) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
-        c.provider, c.options, c.location, c.serde, ctas = true)
+        c.provider,
+        c.options ++ writeOptions,
+        c.location,
+        c.serde,
+        ctas = true)
+
       if (!isV2Provider(provider)) {
         val tableDesc = buildCatalogTable(tbl.asTableIdentifier, new StructType,
           c.partitioning, c.bucketSpec, c.properties, provider, c.location,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
index 8aef27a1b66..86108a81da8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala
@@ -23,12 +23,15 @@ import scala.collection.JavaConverters._
 
 import org.scalatest.BeforeAndAfter
 
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
+import org.apache.spark.sql.connector.InMemoryV1Provider
 import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryTable, InMemoryTableCatalog, TableCatalog}
 import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.FakeSourceOne
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
@@ -531,6 +534,23 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
     assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
   }
 
+  test("SPARK-39543 writeOption should be passed to storage properties when fallback to v1") {
+    val provider = classOf[InMemoryV1Provider].getName
+
+    withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, provider)) {
+      spark.range(10)
+        .writeTo("table_name")
+        .option("compression", "zstd").option("name", "table_name")
+        .using(provider)
+        .create()
+      val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("table_name"))
+
+      assert(table.identifier === TableIdentifier("table_name", Some("default")))
+      assert(table.storage.properties.contains("compression"))
+      assert(table.storage.properties.getOrElse("compression", "foo") == "zstd")
+    }
+  }
+
   test("Replace: basic behavior") {
     spark.sql(
       "CREATE TABLE testcat.table_name (id bigint, data string) USING foo PARTITIONED BY (id)")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org