You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/11/27 22:41:25 UTC

[iceberg] branch master updated: Spark 3.3: Fix a separate table cache being created for each rewriteFiles (#5392)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 4caf1b44ec Spark 3.3: Fix a separate table cache being created for each rewriteFiles (#5392)
4caf1b44ec is described below

commit 4caf1b44ec9d1ad6c9e90ccd1203b77c45784b87
Author: Manu Zhang <ti...@ebay.com>
AuthorDate: Mon Nov 28 06:41:20 2022 +0800

    Spark 3.3: Fix a separate table cache being created for each rewriteFiles (#5392)
---
 .../iceberg/spark/actions/RewriteDataFilesSparkAction.java |  5 ++++-
 .../apache/iceberg/spark/actions/SparkBinPackStrategy.java |  7 +------
 .../apache/iceberg/spark/actions/SparkSortStrategy.java    | 12 ++++--------
 .../apache/iceberg/spark/actions/SparkZOrderStrategy.java  | 14 ++++++--------
 4 files changed, 15 insertions(+), 23 deletions(-)

diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index f754fcb4c7..eeb4e49e30 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -68,6 +68,7 @@ import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.StructLikeMap;
 import org.apache.iceberg.util.Tasks;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -96,7 +97,9 @@ public class RewriteDataFilesSparkAction
   private RewriteStrategy strategy = null;
 
   RewriteDataFilesSparkAction(SparkSession spark, Table table) {
-    super(spark);
+    super(spark.cloneSession());
+    // Disable Adaptive Query Execution as this may change the output partitioning of our write
+    spark().conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
     this.table = table;
   }
 
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
index aaa63c0141..483b06de4e 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
@@ -34,7 +34,6 @@ import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.internal.SQLConf;
 
 public class SparkBinPackStrategy extends BinPackStrategy {
   private final Table table;
@@ -60,12 +59,8 @@ public class SparkBinPackStrategy extends BinPackStrategy {
       tableCache.add(groupID, table);
       manager.stageTasks(table, groupID, filesToRewrite);
 
-      // Disable Adaptive Query Execution as this may change the output partitioning of our write
-      SparkSession cloneSession = spark.cloneSession();
-      cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
-
       Dataset<Row> scanDF =
-          cloneSession
+          spark
               .read()
               .format("iceberg")
               .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
index 285a46fc54..d79360034f 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
@@ -119,26 +119,22 @@ public class SparkSortStrategy extends SortStrategy {
       tableCache.add(groupID, table);
       manager.stageTasks(table, groupID, filesToRewrite);
 
-      // Disable Adaptive Query Execution as this may change the output partitioning of our write
-      SparkSession cloneSession = spark.cloneSession();
-      cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
-
       // Reset Shuffle Partitions for our sort
       long numOutputFiles =
           numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
-      cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
+      spark.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
 
       Dataset<Row> scanDF =
-          cloneSession
+          spark
               .read()
               .format("iceberg")
               .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
               .load(groupID);
 
       // write the packed data into new files where each split becomes a new file
-      SQLConf sqlConf = cloneSession.sessionState().conf();
+      SQLConf sqlConf = spark.sessionState().conf();
       LogicalPlan sortPlan = sortPlan(distribution, ordering, scanDF.logicalPlan(), sqlConf);
-      Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
+      Dataset<Row> sortedDf = new Dataset<>(spark, sortPlan, scanDF.encoder());
 
       sortedDf
           .write()
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
index 2b3397c9db..ed09e974b9 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
@@ -202,17 +202,15 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
       tableCache().add(groupID, table());
       manager().stageTasks(table(), groupID, filesToRewrite);
 
-      // Disable Adaptive Query Execution as this may change the output partitioning of our write
-      SparkSession cloneSession = spark().cloneSession();
-      cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
-
+      // spark session from parent
+      SparkSession spark = spark();
       // Reset Shuffle Partitions for our sort
       long numOutputFiles =
           numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple()));
-      cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
+      spark.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
 
       Dataset<Row> scanDF =
-          cloneSession
+          spark
               .read()
               .format("iceberg")
               .option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
@@ -235,9 +233,9 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
 
       Dataset<Row> zvalueDF = scanDF.withColumn(Z_COLUMN, zOrderUDF.interleaveBytes(zvalueArray));
 
-      SQLConf sqlConf = cloneSession.sessionState().conf();
+      SQLConf sqlConf = spark.sessionState().conf();
       LogicalPlan sortPlan = sortPlan(distribution, ordering, zvalueDF.logicalPlan(), sqlConf);
-      Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, zvalueDF.encoder());
+      Dataset<Row> sortedDf = new Dataset<>(spark, sortPlan, zvalueDF.encoder());
       sortedDf
           .select(originalColumns)
           .write()