You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "nastra (via GitHub)" <gi...@apache.org> on 2023/05/10 06:40:44 UTC

[GitHub] [iceberg] nastra commented on a diff in pull request #7499: Doc: Updates Writing to Partitioned Table Spark Docs

nastra commented on code in PR #7499:
URL: https://github.com/apache/iceberg/pull/7499#discussion_r1189418388


##########
docs/spark-writes.md:
##########
@@ -339,74 +331,61 @@ USING iceberg
 PARTITIONED BY (days(ts), category)
 ```
 
-To write data to the sample table, your data needs to be sorted by `days(ts), category`.
-
-If you're inserting data with SQL statement, you can use `ORDER BY` to achieve it, like below:
+To write data to the sample table, data needs to be sorted by `days(ts), category` but this is taken care
+of automatically by the default `hash` distribution. Previously this would have required manually sorting, but this 
+is no longer the case.

Review Comment:
   when finishing this sentence, it's not clear what the below SQL example is trying to tell me. Maybe add a sentence saying that previously an `ORDER BY` was required in the below SQL



##########
docs/spark-writes.md:
##########
@@ -339,74 +331,61 @@ USING iceberg
 PARTITIONED BY (days(ts), category)
 ```
 
-To write data to the sample table, your data needs to be sorted by `days(ts), category`.
-
-If you're inserting data with SQL statement, you can use `ORDER BY` to achieve it, like below:
+To write data to the sample table, data needs to be sorted by `days(ts), category` but this is taken care
+of automatically by the default `hash` distribution. Previously this would have required manually sorting, but this 
+is no longer the case.
 
 ```sql
 INSERT INTO prod.db.sample
 SELECT id, data, category, ts FROM another_table
-ORDER BY ts, category
-```
-
-If you're inserting data with DataFrame, you can use either `orderBy`/`sort` to trigger global sort, or `sortWithinPartitions`
-to trigger local sort. Local sort for example:
-
-```scala
-data.sortWithinPartitions("ts", "category")
-    .writeTo("prod.db.sample")
-    .append()
 ```
 
-You can simply add the original column to the sort condition for the most partition transformations, except `bucket`.
-
-For `bucket` partition transformation, you need to register the Iceberg transform function in Spark to specify it during sort.
-
-Let's go through another sample table having bucket partition:
-
-```sql
-CREATE TABLE prod.db.sample (
-    id bigint,
-    data string,
-    category string,
-    ts timestamp)
-USING iceberg
-PARTITIONED BY (bucket(16, id))
-```
-
-You need to register the function to deal with bucket, like below:
-
-```scala
-import org.apache.iceberg.spark.IcebergSpark
-import org.apache.spark.sql.types.DataTypes
-
-IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16)
-```
-
-{{< hint info >}}
-Explicit registration of the function is necessary because Spark doesn't allow Iceberg to provide functions.
-[SPARK-27658](https://issues.apache.org/jira/browse/SPARK-27658) is filed to enable Iceberg to provide functions
-which can be used in query.
-{{< /hint >}}
-
-Here we just registered the bucket function as `iceberg_bucket16`, which can be used in sort clause.
-
-If you're inserting data with SQL statement, you can use the function like below:
-
-```sql
-INSERT INTO prod.db.sample
-SELECT id, data, category, ts FROM another_table
-ORDER BY iceberg_bucket16(id)
-```
-
-If you're inserting data with DataFrame, you can use the function like below:
-
-```scala
-data.sortWithinPartitions(expr("iceberg_bucket16(id)"))
-    .writeTo("prod.db.sample")
-    .append()
-```
 
+There are 3 options for `write.distribution-mode`
+
+* `none` - This is the previous default for Iceberg.  
+This mode does not request any shuffles or sort to be performed automatically by Spark. Because no work is done 
+automatically by Spark, the data must be *manually* sorted by partition value. The data must be sorted either within 
+each spark task, or globally within the entire dataset. A global sort will minimize the number of output files.  
+A sort can be avoided by using the Spark [write fanout](#write-properties) property but this will cause all 
+file handles to remain open until each write task has completed.
+* `hash` - This mode is the new default and requests that Spark uses a hash-based exchange to shuffle the incoming
+write data before writing.  
+Practically, this means that each row is hashed based on the row's partition value and then placed
+in a corresponding Spark task based upon that value. Further division and coalescing of tasks may take place because of
+the [Spark's Adaptive Query planning](#controlling-file-sizes).

Review Comment:
   ```suggestion
   [Spark's Adaptive Query planning](#controlling-file-sizes).
   ```



##########
docs/spark-writes.md:
##########
@@ -339,74 +331,61 @@ USING iceberg
 PARTITIONED BY (days(ts), category)
 ```
 
-To write data to the sample table, your data needs to be sorted by `days(ts), category`.
-
-If you're inserting data with SQL statement, you can use `ORDER BY` to achieve it, like below:
+To write data to the sample table, data needs to be sorted by `days(ts), category` but this is taken care
+of automatically by the default `hash` distribution. Previously this would have required manually sorting, but this 
+is no longer the case.
 
 ```sql
 INSERT INTO prod.db.sample
 SELECT id, data, category, ts FROM another_table
-ORDER BY ts, category
-```
-
-If you're inserting data with DataFrame, you can use either `orderBy`/`sort` to trigger global sort, or `sortWithinPartitions`
-to trigger local sort. Local sort for example:
-
-```scala
-data.sortWithinPartitions("ts", "category")
-    .writeTo("prod.db.sample")
-    .append()
 ```
 
-You can simply add the original column to the sort condition for the most partition transformations, except `bucket`.
-
-For `bucket` partition transformation, you need to register the Iceberg transform function in Spark to specify it during sort.
-
-Let's go through another sample table having bucket partition:
-
-```sql
-CREATE TABLE prod.db.sample (
-    id bigint,
-    data string,
-    category string,
-    ts timestamp)
-USING iceberg
-PARTITIONED BY (bucket(16, id))
-```
-
-You need to register the function to deal with bucket, like below:
-
-```scala
-import org.apache.iceberg.spark.IcebergSpark
-import org.apache.spark.sql.types.DataTypes
-
-IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16)
-```
-
-{{< hint info >}}
-Explicit registration of the function is necessary because Spark doesn't allow Iceberg to provide functions.
-[SPARK-27658](https://issues.apache.org/jira/browse/SPARK-27658) is filed to enable Iceberg to provide functions
-which can be used in query.
-{{< /hint >}}
-
-Here we just registered the bucket function as `iceberg_bucket16`, which can be used in sort clause.
-
-If you're inserting data with SQL statement, you can use the function like below:
-
-```sql
-INSERT INTO prod.db.sample
-SELECT id, data, category, ts FROM another_table
-ORDER BY iceberg_bucket16(id)
-```
-
-If you're inserting data with DataFrame, you can use the function like below:
-
-```scala
-data.sortWithinPartitions(expr("iceberg_bucket16(id)"))
-    .writeTo("prod.db.sample")
-    .append()
-```
 
+There are 3 options for `write.distribution-mode`
+
+* `none` - This is the previous default for Iceberg.  
+This mode does not request any shuffles or sort to be performed automatically by Spark. Because no work is done 
+automatically by Spark, the data must be *manually* sorted by partition value. The data must be sorted either within 
+each spark task, or globally within the entire dataset. A global sort will minimize the number of output files.  
+A sort can be avoided by using the Spark [write fanout](#write-properties) property but this will cause all 
+file handles to remain open until each write task has completed.
+* `hash` - This mode is the new default and requests that Spark uses a hash-based exchange to shuffle the incoming
+write data before writing.  
+Practically, this means that each row is hashed based on the row's partition value and then placed
+in a corresponding Spark task based upon that value. Further division and coalescing of tasks may take place because of
+the [Spark's Adaptive Query planning](#controlling-file-sizes).
+* `range` - This mode requests that Spark perform a range based exchanged to shuffle the data before writing.  
+This is a two stage procedure which is more expensive than the `hash` mode. The first stage samples the data to 
+be written based on the partition and sort columns. The second stage uses the range information to shuffle the input data into Spark 
+tasks. Each task gets an exclusive range of the input data which clusters the data by partition and also globally sorts.  
+While this is more expensive than the hash distribution, the global ordering can be beneficial for read performance if
+sorted columns are used during queries. This mode is used by default if a table is created with a 
+sort-order. Further division and coalescing of tasks may take place because of
+[Spark's Adaptive Query planning](#controlling-file-sizes).
+
+
+## Controlling File Sizes
+
+When writing data to Iceberg with Spark, it's important to note that Spark cannot write a file larger than a Spark 
+task and a file cannot span an Iceberg partition boundary. This means although Iceberg will always roll over a file 
+when it grows to [`write.target-file-size-bytes`](../configuration/#write-properties), but unless the Spark task is 
+large enough that will not happen. The size of the file created on disk will also be much smaller than the Spark task 
+since the on disk data will be both compressed and in columnar format as opposed to Spark's uncompressed row 
+representation. This means a 100 megabyte Spark task will create a file much smaller than 100 megabytes even if that
+task is writing to a single Iceberg partition. If the task writes to multiple partitions, the files will be even
+smaller than that.
+
+To control what data ends up in each Spark task use a [`write distribution mode`](#writing-distribution-modes) 
+or manually repartition the data. 
+
+To adjust Spark's task size it is important to become familiar with Spark's various Adaptive Query Execution (AQE) 
+parameters. When the `write.distribution-mode` is not `none`, AQE will control the coalescing and splitting of Spark
+tasks during the exchange to try to create tasks of `spark.sql.adaptive.advisoryPartitionSizeInBytes` size. These 
+settings will also effect any user performed re-partitions or sorts. 
+It is important again to note that this is the in-memory Spark row size and not the on disk
+columnar-compressed size, so a larger value that the target file size will need to be specified. The ratio of 

Review Comment:
   ```suggestion
   columnar-compressed size, so a larger value than the target file size will need to be specified. The ratio of 
   ```



##########
docs/spark-writes.md:
##########
@@ -339,74 +331,61 @@ USING iceberg
 PARTITIONED BY (days(ts), category)
 ```
 
-To write data to the sample table, your data needs to be sorted by `days(ts), category`.
-
-If you're inserting data with SQL statement, you can use `ORDER BY` to achieve it, like below:
+To write data to the sample table, data needs to be sorted by `days(ts), category` but this is taken care
+of automatically by the default `hash` distribution. Previously this would have required manually sorting, but this 
+is no longer the case.
 
 ```sql
 INSERT INTO prod.db.sample
 SELECT id, data, category, ts FROM another_table
-ORDER BY ts, category
-```
-
-If you're inserting data with DataFrame, you can use either `orderBy`/`sort` to trigger global sort, or `sortWithinPartitions`
-to trigger local sort. Local sort for example:
-
-```scala
-data.sortWithinPartitions("ts", "category")
-    .writeTo("prod.db.sample")
-    .append()
 ```
 
-You can simply add the original column to the sort condition for the most partition transformations, except `bucket`.
-
-For `bucket` partition transformation, you need to register the Iceberg transform function in Spark to specify it during sort.
-
-Let's go through another sample table having bucket partition:
-
-```sql
-CREATE TABLE prod.db.sample (
-    id bigint,
-    data string,
-    category string,
-    ts timestamp)
-USING iceberg
-PARTITIONED BY (bucket(16, id))
-```
-
-You need to register the function to deal with bucket, like below:
-
-```scala
-import org.apache.iceberg.spark.IcebergSpark
-import org.apache.spark.sql.types.DataTypes
-
-IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16)
-```
-
-{{< hint info >}}
-Explicit registration of the function is necessary because Spark doesn't allow Iceberg to provide functions.
-[SPARK-27658](https://issues.apache.org/jira/browse/SPARK-27658) is filed to enable Iceberg to provide functions
-which can be used in query.
-{{< /hint >}}
-
-Here we just registered the bucket function as `iceberg_bucket16`, which can be used in sort clause.
-
-If you're inserting data with SQL statement, you can use the function like below:
-
-```sql
-INSERT INTO prod.db.sample
-SELECT id, data, category, ts FROM another_table
-ORDER BY iceberg_bucket16(id)
-```
-
-If you're inserting data with DataFrame, you can use the function like below:
-
-```scala
-data.sortWithinPartitions(expr("iceberg_bucket16(id)"))
-    .writeTo("prod.db.sample")
-    .append()
-```
 
+There are 3 options for `write.distribution-mode`
+
+* `none` - This is the previous default for Iceberg.  
+This mode does not request any shuffles or sort to be performed automatically by Spark. Because no work is done 
+automatically by Spark, the data must be *manually* sorted by partition value. The data must be sorted either within 
+each spark task, or globally within the entire dataset. A global sort will minimize the number of output files.  
+A sort can be avoided by using the Spark [write fanout](#write-properties) property but this will cause all 
+file handles to remain open until each write task has completed.
+* `hash` - This mode is the new default and requests that Spark uses a hash-based exchange to shuffle the incoming
+write data before writing.  
+Practically, this means that each row is hashed based on the row's partition value and then placed
+in a corresponding Spark task based upon that value. Further division and coalescing of tasks may take place because of
+the [Spark's Adaptive Query planning](#controlling-file-sizes).
+* `range` - This mode requests that Spark perform a range based exchanged to shuffle the data before writing.  
+This is a two stage procedure which is more expensive than the `hash` mode. The first stage samples the data to 
+be written based on the partition and sort columns. The second stage uses the range information to shuffle the input data into Spark 
+tasks. Each task gets an exclusive range of the input data which clusters the data by partition and also globally sorts.  
+While this is more expensive than the hash distribution, the global ordering can be beneficial for read performance if
+sorted columns are used during queries. This mode is used by default if a table is created with a 
+sort-order. Further division and coalescing of tasks may take place because of
+[Spark's Adaptive Query planning](#controlling-file-sizes).
+
+
+## Controlling File Sizes
+
+When writing data to Iceberg with Spark, it's important to note that Spark cannot write a file larger than a Spark 
+task and a file cannot span an Iceberg partition boundary. This means although Iceberg will always roll over a file 
+when it grows to [`write.target-file-size-bytes`](../configuration/#write-properties), but unless the Spark task is 
+large enough that will not happen. The size of the file created on disk will also be much smaller than the Spark task 
+since the on disk data will be both compressed and in columnar format as opposed to Spark's uncompressed row 
+representation. This means a 100 megabyte Spark task will create a file much smaller than 100 megabytes even if that
+task is writing to a single Iceberg partition. If the task writes to multiple partitions, the files will be even
+smaller than that.
+
+To control what data ends up in each Spark task use a [`write distribution mode`](#writing-distribution-modes) 
+or manually repartition the data. 
+
+To adjust Spark's task size it is important to become familiar with Spark's various Adaptive Query Execution (AQE) 
+parameters. When the `write.distribution-mode` is not `none`, AQE will control the coalescing and splitting of Spark
+tasks during the exchange to try to create tasks of `spark.sql.adaptive.advisoryPartitionSizeInBytes` size. These 
+settings will also effect any user performed re-partitions or sorts. 

Review Comment:
   effect -> affect?



##########
docs/spark-writes.md:
##########
@@ -339,74 +331,61 @@ USING iceberg
 PARTITIONED BY (days(ts), category)
 ```
 
-To write data to the sample table, your data needs to be sorted by `days(ts), category`.
-
-If you're inserting data with SQL statement, you can use `ORDER BY` to achieve it, like below:
+To write data to the sample table, data needs to be sorted by `days(ts), category` but this is taken care
+of automatically by the default `hash` distribution. Previously this would have required manually sorting, but this 
+is no longer the case.
 
 ```sql
 INSERT INTO prod.db.sample
 SELECT id, data, category, ts FROM another_table
-ORDER BY ts, category
-```
-
-If you're inserting data with DataFrame, you can use either `orderBy`/`sort` to trigger global sort, or `sortWithinPartitions`
-to trigger local sort. Local sort for example:
-
-```scala
-data.sortWithinPartitions("ts", "category")
-    .writeTo("prod.db.sample")
-    .append()
 ```
 
-You can simply add the original column to the sort condition for the most partition transformations, except `bucket`.
-
-For `bucket` partition transformation, you need to register the Iceberg transform function in Spark to specify it during sort.
-
-Let's go through another sample table having bucket partition:
-
-```sql
-CREATE TABLE prod.db.sample (
-    id bigint,
-    data string,
-    category string,
-    ts timestamp)
-USING iceberg
-PARTITIONED BY (bucket(16, id))
-```
-
-You need to register the function to deal with bucket, like below:
-
-```scala
-import org.apache.iceberg.spark.IcebergSpark
-import org.apache.spark.sql.types.DataTypes
-
-IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType, 16)
-```
-
-{{< hint info >}}
-Explicit registration of the function is necessary because Spark doesn't allow Iceberg to provide functions.
-[SPARK-27658](https://issues.apache.org/jira/browse/SPARK-27658) is filed to enable Iceberg to provide functions
-which can be used in query.
-{{< /hint >}}
-
-Here we just registered the bucket function as `iceberg_bucket16`, which can be used in sort clause.
-
-If you're inserting data with SQL statement, you can use the function like below:
-
-```sql
-INSERT INTO prod.db.sample
-SELECT id, data, category, ts FROM another_table
-ORDER BY iceberg_bucket16(id)
-```
-
-If you're inserting data with DataFrame, you can use the function like below:
-
-```scala
-data.sortWithinPartitions(expr("iceberg_bucket16(id)"))
-    .writeTo("prod.db.sample")
-    .append()
-```
 
+There are 3 options for `write.distribution-mode`
+
+* `none` - This is the previous default for Iceberg.  
+This mode does not request any shuffles or sort to be performed automatically by Spark. Because no work is done 
+automatically by Spark, the data must be *manually* sorted by partition value. The data must be sorted either within 
+each spark task, or globally within the entire dataset. A global sort will minimize the number of output files.  
+A sort can be avoided by using the Spark [write fanout](#write-properties) property but this will cause all 
+file handles to remain open until each write task has completed.
+* `hash` - This mode is the new default and requests that Spark uses a hash-based exchange to shuffle the incoming
+write data before writing.  
+Practically, this means that each row is hashed based on the row's partition value and then placed
+in a corresponding Spark task based upon that value. Further division and coalescing of tasks may take place because of
+the [Spark's Adaptive Query planning](#controlling-file-sizes).

Review Comment:
   ```suggestion
   [Spark's Adaptive Query planning](#controlling-file-sizes).
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org