You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2023/05/10 20:58:04 UTC

[iceberg] branch master updated: Doc: Updates Writing to Partitioned Table Spark Docs (#7499)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a06bb5aab Doc: Updates Writing to Partitioned Table Spark Docs (#7499)
2a06bb5aab is described below

commit 2a06bb5aabae96ec8a32a4a80211950338318000
Author: Russell Spitzer <ru...@GMAIL.COM>
AuthorDate: Wed May 10 15:57:59 2023 -0500

    Doc: Updates Writing to Partitioned Table Spark Docs (#7499)
---
 docs/spark-writes.md | 127 +++++++++++++++++++++------------------------------
 1 file changed, 53 insertions(+), 74 deletions(-)

diff --git a/docs/spark-writes.md b/docs/spark-writes.md
index 87a58452c6..7ba1516d65 100644
--- a/docs/spark-writes.md
+++ b/docs/spark-writes.md
@@ -312,20 +312,12 @@ data.writeTo("prod.db.table")
     .createOrReplace()
 ```
 
-## Writing to partitioned tables
+## Writing Distribution Modes
 
-Iceberg requires the data to be sorted according to the partition spec per task (Spark partition) in prior to write
-against partitioned table. This applies both Writing with SQL and Writing with DataFrames.
-
-{{< hint info >}}
-Explicit sort is necessary because Spark doesn't allow Iceberg to request a sort before writing as of Spark 3.0.
-[SPARK-23889](https://issues.apache.org/jira/browse/SPARK-23889) is filed to enable Iceberg to require specific
-distribution & sort order to Spark.
-{{< /hint >}}
-
-{{< hint info >}}
-Both global sort (`orderBy`/`sort`) and local sort (`sortWithinPartitions`) work for the requirement.
-{{< /hint >}}
+Iceberg's default Spark writers require that the data in each spark task is clustered by partition values. This 
+distribution is required to minimize the number of file handles that are held open while writing. By default, starting
+in Iceberg 1.2.0, Iceberg also requests that Spark pre-sort data to be written to fit this distribution. The
+request to Spark is done through the table property `write.distribution-mode` with the value `hash`.
 
 Let's go through writing the data against below sample table:
 
@@ -339,74 +331,61 @@ USING iceberg
 PARTITIONED BY (days(ts), category)
 ```
 
-To write data to the sample table, your data needs to be sorted by `days(ts), category`.
-
-If you're inserting data with SQL statement, you can use `ORDER BY` to achieve it, like below:
+To write data to the sample table, data needs to be sorted by `days(ts), category` but this is taken care
+of automatically by the default `hash` distribution. Previously this would have required manually sorting, but this 
+is no longer the case.
 
 ```sql
 INSERT INTO prod.db.sample
 SELECT id, data, category, ts FROM another_table
-ORDER BY ts, category
-```
-
-If you're inserting data with DataFrame, you can use either `orderBy`/`sort` to trigger global sort, or `sortWithinPartitions`
-to trigger local sort. Local sort for example:
-
-```scala
-data.sortWithinPartitions("ts", "category")
-    .writeTo("prod.db.sample")
-    .append()
 ```
 
-You can simply add the original column to the sort condition for the most partition transformations, except `bucket`.
-
-For `bucket` partition transformation, you need to register the Iceberg transform function in Spark to specify it during sort.
-
-Let's go through another sample table having bucket partition:
-
-```sql
-CREATE TABLE prod.db.sample (
-    id bigint,
-    data string,
-    category string,
-    ts timestamp)
-USING iceberg
-PARTITIONED BY (bucket(16, id))
-```
-
-You need to register the function to deal with bucket, like below:
-
-```scala
-import org.apache.iceberg.spark.IcebergSpark
-import org.apache.spark.sql.types.DataTypes
-
-IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16)
-```
-
-{{< hint info >}}
-Explicit registration of the function is necessary because Spark doesn't allow Iceberg to provide functions.
-[SPARK-27658](https://issues.apache.org/jira/browse/SPARK-27658) is filed to enable Iceberg to provide functions
-which can be used in query.
-{{< /hint >}}
-
-Here we just registered the bucket function as `iceberg_bucket16`, which can be used in sort clause.
-
-If you're inserting data with SQL statement, you can use the function like below:
-
-```sql
-INSERT INTO prod.db.sample
-SELECT id, data, category, ts FROM another_table
-ORDER BY iceberg_bucket16(id)
-```
-
-If you're inserting data with DataFrame, you can use the function like below:
-
-```scala
-data.sortWithinPartitions(expr("iceberg_bucket16(id)"))
-    .writeTo("prod.db.sample")
-    .append()
-```
 
+There are 3 options for `write.distribution-mode`
+
+* `none` - This is the previous default for Iceberg.  
+This mode does not request any shuffles or sort to be performed automatically by Spark. Because no work is done 
+automatically by Spark, the data must be *manually* sorted by partition value. The data must be sorted either within 
+each spark task, or globally within the entire dataset. A global sort will minimize the number of output files.  
+A sort can be avoided by using the Spark [write fanout](#write-properties) property but this will cause all 
+file handles to remain open until each write task has completed.
+* `hash` - This mode is the new default and requests that Spark uses a hash-based exchange to shuffle the incoming
+write data before writing.  
+Practically, this means that each row is hashed based on the row's partition value and then placed
+in a corresponding Spark task based upon that value. Further division and coalescing of tasks may take place because of
+[Spark's Adaptive Query planning](#controlling-file-sizes).
+* `range` - This mode requests that Spark perform a range based exchanged to shuffle the data before writing.  
+This is a two stage procedure which is more expensive than the `hash` mode. The first stage samples the data to 
+be written based on the partition and sort columns. The second stage uses the range information to shuffle the input data into Spark 
+tasks. Each task gets an exclusive range of the input data which clusters the data by partition and also globally sorts.  
+While this is more expensive than the hash distribution, the global ordering can be beneficial for read performance if
+sorted columns are used during queries. This mode is used by default if a table is created with a 
+sort-order. Further division and coalescing of tasks may take place because of
+[Spark's Adaptive Query planning](#controlling-file-sizes).
+
+
+## Controlling File Sizes
+
+When writing data to Iceberg with Spark, it's important to note that Spark cannot write a file larger than a Spark 
+task and a file cannot span an Iceberg partition boundary. This means although Iceberg will always roll over a file 
+when it grows to [`write.target-file-size-bytes`](../configuration/#write-properties), but unless the Spark task is 
+large enough that will not happen. The size of the file created on disk will also be much smaller than the Spark task 
+since the on disk data will be both compressed and in columnar format as opposed to Spark's uncompressed row 
+representation. This means a 100 megabyte Spark task will create a file much smaller than 100 megabytes even if that
+task is writing to a single Iceberg partition. If the task writes to multiple partitions, the files will be even
+smaller than that.
+
+To control what data ends up in each Spark task use a [`write distribution mode`](#writing-distribution-modes) 
+or manually repartition the data. 
+
+To adjust Spark's task size it is important to become familiar with Spark's various Adaptive Query Execution (AQE) 
+parameters. When the `write.distribution-mode` is not `none`, AQE will control the coalescing and splitting of Spark
+tasks during the exchange to try to create tasks of `spark.sql.adaptive.advisoryPartitionSizeInBytes` size. These 
+settings will also affect any user performed re-partitions or sorts. 
+It is important again to note that this is the in-memory Spark row size and not the on disk
+columnar-compressed size, so a larger value than the target file size will need to be specified. The ratio of 
+in-memory size to on disk size is data dependent. Future work in Spark should allow Iceberg to automatically adjust this
+parameter at write time to match the `write.target-file-size-bytes`.
 
 ## Type compatibility