You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "szehon-ho (via GitHub)" <gi...@apache.org> on 2023/05/25 22:22:20 UTC

[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7703: Spark 3.4: Support fanout writers in SparkPositionDeltaWrite

szehon-ho commented on code in PR #7703:
URL: https://github.com/apache/iceberg/pull/7703#discussion_r1206054291


##########
spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java:
##########
@@ -1805,23 +1805,32 @@ public void testRangeCopyOnWriteMergePartitionedSortedTable() {
   // -------------------------------------------------------------------------
   // delete mode is NOT SET -> CLUSTER BY _spec_id, _partition, _file +
   //                           LOCALLY ORDERED BY _spec_id, _partition, _file, _pos
+  // delete mode is NOT SET (fanout) -> CLUSTER BY _spec_id, _partition, _file + empty ordering

Review Comment:
   Probably annoying to add another example with ORDERED BY, but is it still correct with  "ORDERED" above?    Will it not be empty_ordering?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -389,10 +391,44 @@ protected Map<Integer, StructProjection> buildPartitionProjections(
 
       return partitionProjections;
     }
+
+    // use the fanout writer only if enabled and the input is unordered
+    protected PartitioningWriter<InternalRow, DataWriteResult> newDataWriter(
+        Table table, SparkFileWriterFactory writers, OutputFileFactory files, Context context) {
+
+      FileIO io = table.io();
+      boolean fanoutEnabled = context.fanoutWriterEnabled();
+      boolean inputOrdered = context.inputOrdered();
+      long targetFileSize = context.targetDataFileSize();
+
+      if (fanoutEnabled && !inputOrdered) {
+        return new FanoutDataWriter<>(writers, files, io, targetFileSize);
+      } else {
+        return new ClusteredDataWriter<>(writers, files, io, targetFileSize);
+      }
+    }
+
+    // the spec requires position deletes to be ordered by file and position
+    // use a fanout writer if the input is unordered no matter whether fanout writers are enabled
+    // clustered writers only validate records for the same spec/paritition are co-located,

Review Comment:
   Optional: clustered writers only validate records for the same spec/paritition are co-located which is not enough for position deletes => clustered writers assume that the position deletes are already ordered by file and position?
   
   (the validation seems a bit orthogonal to previous sentence.   I dont mind putting it in a as well somewhere, but doesnt seem its in the other comment in newDataWriter, so hence thinking its better to omit)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org