You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/05/06 05:11:17 UTC

[GitHub] [iceberg] kbendick commented on a change in pull request #2501: API: API For CompactDataFiles and DataCompactionStrategy

kbendick commented on a change in pull request #2501:
URL: https://github.com/apache/iceberg/pull/2501#discussion_r627081408



##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the entire compaction completing.
+   * This will produce additional commits but allow for progress even if some groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files which will be included in a single file
+   * group. A group will be processed by a single framework "action". For example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = "max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";
+
+  enum RewriteStrategyName {
+    BINPACK,
+    SORT
+  }
 
   /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
-   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
-   * planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
    *
-   * @param splitLookback number of "bins" considered when trying to pack the next file split into a task.
+   * @param strategyName name of the strategy
    * @return this for method chaining
    */
-  RewriteDataFiles splitLookback(int splitLookback);
+  RewriteDataFiles strategy(RewriteStrategyName strategyName);
 
   /**
-   * Specify the cost of opening a file that will be taken into account during packing files into
-   * bins. If the size of the file is smaller than the cost of opening, then this value will be used
-   * instead of the actual file size.
-   * <p>
-   * If not set, defaults to the table's open file cost.
+   * A user provided filter for determining which files will be considered by the compaction strategy. This will be used
+   * in addition to whatever rules the compaction strategy generates. For example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
    *
-   * @param splitOpenFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
    */
-  RewriteDataFiles splitOpenFileCost(long splitOpenFileCost);
+  RewriteDataFiles filter(Expression expression);
 
   /**
-   * The action result that contains a summary of the execution.
+   * A pairing of file group information to the result of the rewriting that file group. If the results are null then
+   * that particular file group failed. We should only have failed groups if partial progress is enabled otherwise we
+   * will report a total failure for the job.
    */
   interface Result {
+    Map<FileGroupInfo, FileGroupRewriteResult> resultMap();
+  }

Review comment:
       Possibly I missed this in the discussion, but should we return something other than null for the failure? I can imagine that we'd want to possibly return metadata about the failure, possibly trimmed stack traces, or something else specific to an individual compaction strategy.
   
   I prefer error responses over null for errors as null is a lot easier to misinterpret or simply miss entirely and then NPE.

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the entire compaction completing.
+   * This will produce additional commits but allow for progress even if some groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files which will be included in a single file
+   * group. A group will be processed by a single framework "action". For example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = "max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";
+
+  enum RewriteStrategyName {
+    BINPACK,
+    SORT
+  }
 
   /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
-   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
-   * planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
    *
-   * @param splitLookback number of "bins" considered when trying to pack the next file split into a task.
+   * @param strategyName name of the strategy

Review comment:
       Maybe "name of the compaction strategy" so that the param definition isn't _exactly_ the param name?
   
   Though I recognize this is partially to appease the linter so might be too nit-picky on my part. 🙂 

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the entire compaction completing.
+   * This will produce additional commits but allow for progress even if some groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files which will be included in a single file
+   * group. A group will be processed by a single framework "action". For example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = "max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";

Review comment:
       +1 on keeping `partition` in the term. 

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the entire compaction completing.
+   * This will produce additional commits but allow for progress even if some groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files which will be included in a single file
+   * group. A group will be processed by a single framework "action". For example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = "max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";
+
+  enum RewriteStrategyName {
+    BINPACK,
+    SORT
+  }
 
   /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
-   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
-   * planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
    *
-   * @param splitLookback number of "bins" considered when trying to pack the next file split into a task.
+   * @param strategyName name of the strategy
    * @return this for method chaining
    */
-  RewriteDataFiles splitLookback(int splitLookback);
+  RewriteDataFiles strategy(RewriteStrategyName strategyName);
 
   /**
-   * Specify the cost of opening a file that will be taken into account during packing files into
-   * bins. If the size of the file is smaller than the cost of opening, then this value will be used
-   * instead of the actual file size.
-   * <p>
-   * If not set, defaults to the table's open file cost.
+   * A user provided filter for determining which files will be considered by the compaction strategy. This will be used
+   * in addition to whatever rules the compaction strategy generates. For example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
    *
-   * @param splitOpenFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
    */
-  RewriteDataFiles splitOpenFileCost(long splitOpenFileCost);
+  RewriteDataFiles filter(Expression expression);
 
   /**
-   * The action result that contains a summary of the execution.
+   * A pairing of file group information to the result of the rewriting that file group. If the results are null then

Review comment:
       Nit: Pairing makes it sound like there's only one entry in the map to me. Maybe just "a map"?

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the entire compaction completing.
+   * This will produce additional commits but allow for progress even if some groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files which will be included in a single file
+   * group. A group will be processed by a single framework "action". For example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = "max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table being updated.

Review comment:
       +1. Aligns with many other similar configs and is more explicit.

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the entire compaction completing.
+   * This will produce additional commits but allow for progress even if some groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files which will be included in a single file
+   * group. A group will be processed by a single framework "action". For example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = "max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";
+
+  enum RewriteStrategyName {
+    BINPACK,
+    SORT
+  }
 
   /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
-   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
-   * planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
    *
-   * @param splitLookback number of "bins" considered when trying to pack the next file split into a task.
+   * @param strategyName name of the strategy
    * @return this for method chaining
    */
-  RewriteDataFiles splitLookback(int splitLookback);
+  RewriteDataFiles strategy(RewriteStrategyName strategyName);
 
   /**
-   * Specify the cost of opening a file that will be taken into account during packing files into
-   * bins. If the size of the file is smaller than the cost of opening, then this value will be used
-   * instead of the actual file size.
-   * <p>
-   * If not set, defaults to the table's open file cost.
+   * A user provided filter for determining which files will be considered by the compaction strategy. This will be used
+   * in addition to whatever rules the compaction strategy generates. For example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
    *
-   * @param splitOpenFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
    */
-  RewriteDataFiles splitOpenFileCost(long splitOpenFileCost);
+  RewriteDataFiles filter(Expression expression);
 
   /**
-   * The action result that contains a summary of the execution.
+   * A pairing of file group information to the result of the rewriting that file group. If the results are null then
+   * that particular file group failed. We should only have failed groups if partial progress is enabled otherwise we
+   * will report a total failure for the job.
    */
   interface Result {
+    Map<FileGroupInfo, FileGroupRewriteResult> resultMap();
+  }
+
+  interface FileGroupRewriteResult {
+    int addedDataFilesCount();
+
+    int rewrittenDataFilesCount();
+  }
+
+  /**
+   * A description of a file group, when it was processed, and within which partition. For use
+   * tracking rewrite operations and for returning results.
+   */
+  interface FileGroupInfo {
+
+    /**
+     * returns which file group this is out of the total set of file groups for this compaction
+     */
+    int globalIndex();

Review comment:
       Correct me if I'm wrong, but I think that since this is `FileGroupInfo` and not the Result, the indexes are likely included so the engine assign and track work etc as opposed to purely for tracking on output. 

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the entire compaction completing.
+   * This will produce additional commits but allow for progress even if some groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files which will be included in a single file
+   * group. A group will be processed by a single framework "action". For example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = "max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";
+
+  enum RewriteStrategyName {
+    BINPACK,
+    SORT
+  }
 
   /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
-   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
-   * planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.

Review comment:
       Is the `currently only support options BINPACK and SORT options` comment necessary?. To me, what's supported is a natural extension of the enum. And then if we declare an enum that's not implemented, we can specify that. Feels like less to maintain / less likely to have the comments drift from the reality, but I'd like to hear what others have to say (and it might be a non-issue).

##########
File path: api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java
##########
@@ -19,92 +19,120 @@
 
 package org.apache.iceberg.actions;
 
-import org.apache.iceberg.DataFile;
+import java.util.Map;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.expressions.Expression;
 
 /**
- * An action that rewrites data files.
+ * An action for rewriting datafiles according to a Rewrite Strategy. Generally used for
+ * optimizing the sizing and layout of datafiles within a table.
  */
-public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, RewriteDataFiles.Result> {
+public interface RewriteDataFiles extends Action<RewriteDataFiles, RewriteDataFiles.Result> {
+
   /**
-   * Pass a row filter to filter {@link DataFile}s to be rewritten.
-   * <p>
-   * Note that all files that may contain data matching the filter may be rewritten.
-   * <p>
-   * If not set, all files will be rewritten.
-   *
-   * @param expr a row filter to filter out data files
-   * @return this for method chaining
+   * Enable committing groups of files (see max-file-group-size) prior to the entire compaction completing.
+   * This will produce additional commits but allow for progress even if some groups fail to commit. This setting
+   * will not change the correctness of the rewrite operation. The default is false, which produces a single commit
+   * when the entire job has completed.
    */
-  RewriteDataFiles filter(Expression expr);
+  String PARTIAL_PROGRESS_ENABLED = "partial-progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
 
   /**
-   * Enables or disables case sensitive expression binding.
-   * <p>
-   * If not set, defaults to false.
-   *
-   * @param caseSensitive caseSensitive
-   * @return this for method chaining
+   * The maximum amount of Iceberg commits that compaction is allowed to produce if partial progress is enabled.
    */
-  RewriteDataFiles caseSensitive(boolean caseSensitive);
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial-progress.max-commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
 
   /**
-   * Pass a PartitionSpec id to specify which PartitionSpec should be used in DataFile rewrite
-   * <p>
-   * If not set, defaults to the table's default spec ID.
-   *
-   * @param specId PartitionSpec id to rewrite
-   * @return this for method chaining
+   * The entire compaction operation is broken down into pieces based on partitioning and within partitions based
+   * on size into groups. These sub-units of compaction are referred to as file groups. The largest amount of data that
+   * should be compacted in a single group is controlled by MAX_FILE_GROUP_SIZE_BYTES. When grouping files, the
+   * underlying compaction strategy will use this value as to limit the files which will be included in a single file
+   * group. A group will be processed by a single framework "action". For example, in Spark this means that each group
+   * would be rewritten in its own Spark action. A group will never contain files for multiple output partitions.
    */
-  RewriteDataFiles outputSpecId(int specId);
+  String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+  long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
 
   /**
-   * Specify the target data file size in bytes.
-   * <p>
-   * If not set, defaults to the table's target file size.
-   *
-   * @param targetSizeInBytes size in bytes of rewrite data file
-   * @return this for method chaining
+   * The max number of file groups to be simultaneously rewritten by the compaction strategy. The structure and
+   * contents of the group is determined by the compaction strategy. Each file group will be rewritten
+   * independently and asynchronously.
+   **/
+  String MAX_CONCURRENT_FILE_GROUP_ACTIONS = "max-concurrent-file-group-actions";
+  int MAX_CONCURRENT_FILE_GROUP_ACTIONS_DEFAULT = 1;
+
+  /**
+   * The output file size that this compaction strategy will attempt to generate when rewriting files. By default this
+   * will use the write.target-size value in the table properties of the table being updated.
+   */
+  String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * The partition spec to use when writing the output data from this operation. By default uses the
+   * current table partition spec.
    */
-  RewriteDataFiles targetSizeInBytes(long targetSizeInBytes);
+  String PARTITION_SPEC_ID = "partition-spec-id";
+
+  enum RewriteStrategyName {
+    BINPACK,
+    SORT
+  }
 
   /**
-   * Specify the number of "bins" considered when trying to pack the next file split into a task. Increasing this
-   * usually makes tasks a bit more even by considering more ways to pack file regions into a single task with extra
-   * planning cost.
-   * <p>
-   * This configuration can reorder the incoming file regions, to preserve order for lower/upper bounds in file
-   * metadata, user can use a lookback of 1.
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
    *
-   * @param splitLookback number of "bins" considered when trying to pack the next file split into a task.
+   * @param strategyName name of the strategy
    * @return this for method chaining
    */
-  RewriteDataFiles splitLookback(int splitLookback);
+  RewriteDataFiles strategy(RewriteStrategyName strategyName);
 
   /**
-   * Specify the cost of opening a file that will be taken into account during packing files into
-   * bins. If the size of the file is smaller than the cost of opening, then this value will be used
-   * instead of the actual file size.
-   * <p>
-   * If not set, defaults to the table's open file cost.
+   * A user provided filter for determining which files will be considered by the compaction strategy. This will be used
+   * in addition to whatever rules the compaction strategy generates. For example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
    *
-   * @param splitOpenFileCost minimum file size to count to pack into one "bin".
-   * @return this for method chaining
+   * @param expression only entries that pass this filter will be compacted

Review comment:
       Nit: Possibly start the param definition with the noun or focus on the expression rather than the result? Like `Filter expression used to select entries for compaction` or something? Reads a little funny to me but I might not have had enough coffee today. 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org