You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/01/25 02:23:22 UTC

[GitHub] [iceberg] aokolnychyi opened a new pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

aokolnychyi opened a new pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970


   This PR changes the distribution and ordering for merge-on-read DELETE.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#discussion_r791307279



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
##########
@@ -244,14 +244,8 @@ public DistributionMode positionDeleteDistributionMode() {
         return deleteMode;
       }
     } else {
-      // use hash distribution if write distribution is range or hash and table is partitioned
-      // avoid range-based shuffles unless the user asks explicitly
-      DistributionMode writeMode = distributionMode();
-      if (writeMode != NONE && table.spec().isPartitioned()) {
-        return HASH;
-      } else {
-        return writeMode;
-      }
+      // avoid range-based shuffles for partitioned tables
+      return table.spec().isPartitioned() ? HASH : RANGE;

Review comment:
       My current thinking is that we should probably request some distribution by default. If the table is partitioned, we should cluster by spec ID and partition to avoid the range-based shuffle. In all other cases, it is probably better to ask for an ordered distribution by spec ID, partition, file and position to reduce the number of produced delete files. Having a lot of delete files will probably cost us more than doing some work during the DELETE operation.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#discussion_r791305876



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDistributionAndOrderingUtil.java
##########
@@ -163,7 +163,7 @@ private static Distribution positionDeleteDistribution(DistributionMode distribu
         return Distributions.clustered(clustering);
 
       case RANGE:
-        SortOrder[] ordering = new SortOrder[]{SPEC_ID_ORDER, PARTITION_ORDER, FILE_PATH_ORDER};
+        SortOrder[] ordering = new SortOrder[]{SPEC_ID_ORDER, PARTITION_ORDER, FILE_PATH_ORDER, ROW_POSITION_ORDER};

Review comment:
       I think including the row position may allow us to avoid straggler tasks when a particular file has most of deletes.

##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
##########
@@ -244,14 +244,8 @@ public DistributionMode positionDeleteDistributionMode() {
         return deleteMode;
       }
     } else {
-      // use hash distribution if write distribution is range or hash and table is partitioned
-      // avoid range-based shuffles unless the user asks explicitly
-      DistributionMode writeMode = distributionMode();
-      if (writeMode != NONE && table.spec().isPartitioned()) {
-        return HASH;
-      } else {
-        return writeMode;
-      }
+      // avoid range-based shuffles for partitioned tables
+      return table.spec().isPartitioned() ? HASH : RANGE;

Review comment:
       My current thinking is that we should probably request some distribution by default. If the table is partitioned, we should cluster by spec ID and partition to avoid the range-based shuffle. In all other cases, it is probably better to ask for an ordered distribution by spec ID, partition, file and position to reduce the number of produced delete files. Having a lot of delete files will probably cost us more than doing some work during the DELETE operation.

##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED

Review comment:
       I think it does not matter whether the table is sorted or not for position deletes.

##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is HASH -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       We may allow `HASH` even if the current spec is unpartitioned but there are multiple specs.
   I am not sure, though. Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#discussion_r791307651



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED

Review comment:
       I think it does not matter whether the table is sorted or not for position deletes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#discussion_r791312059



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       Why default to RANGE here? Is this what we recommend? I'd probably default to HASH because there will be few files. RANGE could take a single file worth of deletes and spread them across 200 delete files accidentally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 merged pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
jackye1995 merged pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] jackye1995 merged pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
jackye1995 merged pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#discussion_r791311579



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED

Review comment:
       Agreed. Position deletes require their own sort order by _file and _pos that we will always use.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#issuecomment-1020773464


   Looks good now. +1 when tests are passing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#discussion_r791313586



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is HASH -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       Actually, no. I think I'd cluster by _spec_id and _partition here. That minimizes the number of files. I'd do this as the default (NONE) and when mode is HASH.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#discussion_r791312538



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is HASH -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       This looks good to me.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#discussion_r791309502



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is HASH -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       We may allow `HASH` even if the current spec is unpartitioned but there are multiple specs.
   I am not sure, though. Thoughts?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#discussion_r791311792



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       This should be fine since there is no partition, but you technically don't need _spec_id and _partition.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#discussion_r791311579



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED

Review comment:
       Agreed. Position deletes require their own sort order by _file and _pos that we will always use.

##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       This should be fine since there is no partition, but you technically don't need _spec_id and _partition.

##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       Why default to RANGE here? Is this what we recommend? I'd probably default to HASH because there will be few files. RANGE could take a single file worth of deletes and spread them across 200 delete files accidentally.

##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       Oh, I see. Other specs are why we need to include _spec_id and _partition. Good call.

##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is HASH -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       This looks good to me.

##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is HASH -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       Actually, no. I think I'd cluster by _spec_id and _partition here. That minimizes the number of files. I'd do this as the default (NONE) and when mode is HASH.

##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is HASH -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is RANGE -> ORDER BY _spec_id, _partition, _file, _pos
+  //
+  // PARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> CLUSTER BY _spec_id, _partition + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is HASH -> CLUSTER BY _spec_id, _partition + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is RANGE -> ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       I think that this set should be what we use for both UNPARTITIONED and PARTITIONED cases. We are writing deletes for the existing file layout, not the current partition spec. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#discussion_r791314487



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is HASH -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is RANGE -> ORDER BY _spec_id, _partition, _file, _pos
+  //
+  // PARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> CLUSTER BY _spec_id, _partition + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is NONE -> unspecified distribution + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is HASH -> CLUSTER BY _spec_id, _partition + LOCALLY ORDER BY _spec_id, _partition, _file, _pos
+  // delete mode is RANGE -> ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       I think that this set should be what we use for both UNPARTITIONED and PARTITIONED cases. We are writing deletes for the existing file layout, not the current partition spec. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a change in pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
aokolnychyi commented on a change in pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#discussion_r791305876



##########
File path: spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkDistributionAndOrderingUtil.java
##########
@@ -163,7 +163,7 @@ private static Distribution positionDeleteDistribution(DistributionMode distribu
         return Distributions.clustered(clustering);
 
       case RANGE:
-        SortOrder[] ordering = new SortOrder[]{SPEC_ID_ORDER, PARTITION_ORDER, FILE_PATH_ORDER};
+        SortOrder[] ordering = new SortOrder[]{SPEC_ID_ORDER, PARTITION_ORDER, FILE_PATH_ORDER, ROW_POSITION_ORDER};

Review comment:
       I think including the row position may allow us to avoid straggler tasks when a particular file has most of deletes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on a change in pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on a change in pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#discussion_r791312425



##########
File path: spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
##########
@@ -1300,6 +1300,202 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, expectedDistribution, expectedOrdering);
   }
 
+  // ===================================================================================
+  // Distribution and ordering for merge-on-read DELETE operations with position deletes
+  // ===================================================================================
+  //
+  // UNPARTITIONED
+  // -------------------------------------------------------------------------
+  // delete mode is NOT SET -> ORDER BY _spec_id, _partition, _file, _pos

Review comment:
       Oh, I see. Other specs are why we need to include _spec_id and _partition. Good call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] rdblue commented on pull request #3970: Spark 3.2: Revise distribution and ordering for merge-on-read DELETE

Posted by GitBox <gi...@apache.org>.
rdblue commented on pull request #3970:
URL: https://github.com/apache/iceberg/pull/3970#issuecomment-1020773464


   Looks good now. +1 when tests are passing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org