You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2022/11/27 22:41:25 UTC
[iceberg] branch master updated: Spark 3.3: Fix a separate table cache being created for each rewriteFiles (#5392)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 4caf1b44ec Spark 3.3: Fix a separate table cache being created for each rewriteFiles (#5392)
4caf1b44ec is described below
commit 4caf1b44ec9d1ad6c9e90ccd1203b77c45784b87
Author: Manu Zhang <ti...@ebay.com>
AuthorDate: Mon Nov 28 06:41:20 2022 +0800
Spark 3.3: Fix a separate table cache being created for each rewriteFiles (#5392)
---
.../iceberg/spark/actions/RewriteDataFilesSparkAction.java | 5 ++++-
.../apache/iceberg/spark/actions/SparkBinPackStrategy.java | 7 +------
.../apache/iceberg/spark/actions/SparkSortStrategy.java | 12 ++++--------
.../apache/iceberg/spark/actions/SparkZOrderStrategy.java | 14 ++++++--------
4 files changed, 15 insertions(+), 23 deletions(-)
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index f754fcb4c7..eeb4e49e30 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -68,6 +68,7 @@ import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructLikeMap;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.internal.SQLConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -96,7 +97,9 @@ public class RewriteDataFilesSparkAction
private RewriteStrategy strategy = null;
RewriteDataFilesSparkAction(SparkSession spark, Table table) {
- super(spark);
+ super(spark.cloneSession());
+ // Disable Adaptive Query Execution as this may change the output partitioning of our write
+ spark().conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
this.table = table;
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
index aaa63c0141..483b06de4e 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
@@ -34,7 +34,6 @@ import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.internal.SQLConf;
public class SparkBinPackStrategy extends BinPackStrategy {
private final Table table;
@@ -60,12 +59,8 @@ public class SparkBinPackStrategy extends BinPackStrategy {
tableCache.add(groupID, table);
manager.stageTasks(table, groupID, filesToRewrite);
- // Disable Adaptive Query Execution as this may change the output partitioning of our write
- SparkSession cloneSession = spark.cloneSession();
- cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
-
Dataset<Row> scanDF =
- cloneSession
+ spark
.read()
.format("iceberg")
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
index 285a46fc54..d79360034f 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
@@ -119,26 +119,22 @@ public class SparkSortStrategy extends SortStrategy {
tableCache.add(groupID, table);
manager.stageTasks(table, groupID, filesToRewrite);
- // Disable Adaptive Query Execution as this may change the output partitioning of our write
- SparkSession cloneSession = spark.cloneSession();
- cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
-
// Reset Shuffle Partitions for our sort
long numOutputFiles =
numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple));
- cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
+ spark.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
Dataset<Row> scanDF =
- cloneSession
+ spark
.read()
.format("iceberg")
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
.load(groupID);
// write the packed data into new files where each split becomes a new file
- SQLConf sqlConf = cloneSession.sessionState().conf();
+ SQLConf sqlConf = spark.sessionState().conf();
LogicalPlan sortPlan = sortPlan(distribution, ordering, scanDF.logicalPlan(), sqlConf);
- Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, scanDF.encoder());
+ Dataset<Row> sortedDf = new Dataset<>(spark, sortPlan, scanDF.encoder());
sortedDf
.write()
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
index 2b3397c9db..ed09e974b9 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
@@ -202,17 +202,15 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
tableCache().add(groupID, table());
manager().stageTasks(table(), groupID, filesToRewrite);
- // Disable Adaptive Query Execution as this may change the output partitioning of our write
- SparkSession cloneSession = spark().cloneSession();
- cloneSession.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
-
+ // spark session from parent
+ SparkSession spark = spark();
// Reset Shuffle Partitions for our sort
long numOutputFiles =
numOutputFiles((long) (inputFileSize(filesToRewrite) * sizeEstimateMultiple()));
- cloneSession.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
+ spark.conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), Math.max(1, numOutputFiles));
Dataset<Row> scanDF =
- cloneSession
+ spark
.read()
.format("iceberg")
.option(SparkReadOptions.FILE_SCAN_TASK_SET_ID, groupID)
@@ -235,9 +233,9 @@ public class SparkZOrderStrategy extends SparkSortStrategy {
Dataset<Row> zvalueDF = scanDF.withColumn(Z_COLUMN, zOrderUDF.interleaveBytes(zvalueArray));
- SQLConf sqlConf = cloneSession.sessionState().conf();
+ SQLConf sqlConf = spark.sessionState().conf();
LogicalPlan sortPlan = sortPlan(distribution, ordering, zvalueDF.logicalPlan(), sqlConf);
- Dataset<Row> sortedDf = new Dataset<>(cloneSession, sortPlan, zvalueDF.encoder());
+ Dataset<Row> sortedDf = new Dataset<>(spark, sortPlan, zvalueDF.encoder());
sortedDf
.select(originalColumns)
.write()