You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2021/04/21 16:56:34 UTC

[GitHub] [iceberg] aokolnychyi commented on a change in pull request #2501: API: API For CompactDataFiles and DataCompactionStrategy

aokolnychyi commented on a change in pull request #2501:
URL: https://github.com/apache/iceberg/pull/2501#discussion_r617688545



##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";

Review comment:
       Iceberg uses `-` instead of `_` in table props and read/write options. Shall we follow that here too?

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files.
+   */
+  String TARGET_FILE_SIZE = "target_file_size";
+
+  /**
+   * A threshold for preventing compaction of partitions whose output will not more than MIN_COMPACTION_OUTPUT_FILES
+   * files.
+   */
+  String MIN_COMPACTION_OUTPUT_FILES = "min_output_files";
+  int MIN_COMPACTION_OUTPUT_FILES_DEFAULT = 1;
+
+  /**
+   * A threshold for preventing compaction of partitions which do not have more than MIN_COMPACTION_INPUT_FILES to
+   * compact.
+   */
+  String MIN_COMPACTION_INPUT_FILES = "min_input_files";
+  int MIN_COMPACTION_INPUT_FILES_DEFAULT = 1;
+
+  /**
+   * The parallelism level to use when processing chunks generated by the Compaction Strategy. The structure and
+   * contents of the chunk is determined by the Compaction Strategy and options passed to it. When running each job will
+   * be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return this for chaining
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
+   *
+   * @param strategyName name of the strategy
+   * @return this for chaining
+   */
+  CompactDataFiles compactionStrategy(String strategyName);
+
+  /**
+   * A user provided filter for determining which files will be considered by the Compaction strategy. This will be used
+   * in addition to whatever rules the Compaction strategy generates. For example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
+   *
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
+   */
+  CompactDataFiles filter(Expression expression);
+
+  /**
+   * A pairing of Chunk information to the result of that chunk's results. If the results are null then that particular
+   * chunk failed. We should only have failed chunks if partial progress is enabled.
+   */
+  class Result {

Review comment:
       I think we either have to switch to classes in all actions or continue to use interfaces.
   
   Maybe, having result implementations in `core` is an overkill and we can move them to actions directly. I just took a look, I think that will be alright. Probably, we won't need engine-specific results.
   
   Thoughts, @RussellSpitzer @rdblue @openinx?

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {

Review comment:
       Do we want to offer this under a new name? I thought this would swap the `RewriteDataFiles` implementation in the new actions API. Right now, `RewriteDataFiles` from the new API is not implemented so we don't have to update any other place, can just move the new logic there.
   
   Plus, I tend to think `rewrite` fits slightly better as we not necessarily compact. We can split files or recluster them, for example. 

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files.
+   */
+  String TARGET_FILE_SIZE = "target_file_size";
+
+  /**
+   * A threshold for preventing compaction of partitions whose output will not more than MIN_COMPACTION_OUTPUT_FILES
+   * files.
+   */
+  String MIN_COMPACTION_OUTPUT_FILES = "min_output_files";

Review comment:
       These may be specific to bin-packing. Shall we wait a bit until we know how the sort-based compaction is going to look like? We can always add them here.

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes

Review comment:
       nit: `...SIZE_BYTES_DEFAULT`?

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files.

Review comment:
       Shall we mention the default?

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files.
+   */
+  String TARGET_FILE_SIZE = "target_file_size";
+
+  /**
+   * A threshold for preventing compaction of partitions whose output will not more than MIN_COMPACTION_OUTPUT_FILES
+   * files.
+   */
+  String MIN_COMPACTION_OUTPUT_FILES = "min_output_files";
+  int MIN_COMPACTION_OUTPUT_FILES_DEFAULT = 1;
+
+  /**
+   * A threshold for preventing compaction of partitions which do not have more than MIN_COMPACTION_INPUT_FILES to
+   * compact.
+   */
+  String MIN_COMPACTION_INPUT_FILES = "min_input_files";
+  int MIN_COMPACTION_INPUT_FILES_DEFAULT = 1;

Review comment:
       I'd pick the default value based on what would work best in most cases as we are trying to minimize the input from the user. I know we set this as 1 due to a concern that a rewrite may be noop otherwise. However, is there any realistic use case where users may want to always compact 2 tiny files? If the user would need to always change this value, I'd pick another default value.

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files.
+   */
+  String TARGET_FILE_SIZE = "target_file_size";
+
+  /**
+   * A threshold for preventing compaction of partitions whose output will not more than MIN_COMPACTION_OUTPUT_FILES
+   * files.
+   */
+  String MIN_COMPACTION_OUTPUT_FILES = "min_output_files";
+  int MIN_COMPACTION_OUTPUT_FILES_DEFAULT = 1;
+
+  /**
+   * A threshold for preventing compaction of partitions which do not have more than MIN_COMPACTION_INPUT_FILES to
+   * compact.
+   */
+  String MIN_COMPACTION_INPUT_FILES = "min_input_files";
+  int MIN_COMPACTION_INPUT_FILES_DEFAULT = 1;
+
+  /**
+   * The parallelism level to use when processing chunks generated by the Compaction Strategy. The structure and
+   * contents of the chunk is determined by the Compaction Strategy and options passed to it. When running each job will
+   * be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return this for chaining
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
+   *
+   * @param strategyName name of the strategy
+   * @return this for chaining
+   */
+  CompactDataFiles compactionStrategy(String strategyName);
+
+  /**
+   * A user provided filter for determining which files will be considered by the Compaction strategy. This will be used
+   * in addition to whatever rules the Compaction strategy generates. For example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
+   *
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
+   */
+  CompactDataFiles filter(Expression expression);
+
+  /**
+   * A pairing of Chunk information to the result of that chunk's results. If the results are null then that particular
+   * chunk failed. We should only have failed chunks if partial progress is enabled.
+   */
+  class Result {
+    private Map<CompactionChunkInfo, CompactionChunkResult> resultMap;
+
+
+    public Result(Map<CompactionChunkInfo, CompactionChunkResult> resultMap) {
+      this.resultMap = resultMap;
+    }
+
+    public Map<CompactionChunkInfo, CompactionChunkResult> resultMap() {
+      return resultMap;
+    }
+  }
+
+  interface CompactionChunkResult {
+
+    int numFilesAdded();
+
+    int numFilesRemoved();
+  }
+
+  interface CompactionChunkInfo {
+
+    /**
+     * returns which chunk this is out of the total set of chunks for this compaction
+     */
+    int chunkIndex();

Review comment:
       Is it global or per partition?

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files.
+   */
+  String TARGET_FILE_SIZE = "target_file_size";

Review comment:
       `TARGET_FILE_SIZE_BYTES`?

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files.
+   */
+  String TARGET_FILE_SIZE = "target_file_size";
+
+  /**
+   * A threshold for preventing compaction of partitions whose output will not more than MIN_COMPACTION_OUTPUT_FILES
+   * files.
+   */
+  String MIN_COMPACTION_OUTPUT_FILES = "min_output_files";
+  int MIN_COMPACTION_OUTPUT_FILES_DEFAULT = 1;
+
+  /**
+   * A threshold for preventing compaction of partitions which do not have more than MIN_COMPACTION_INPUT_FILES to
+   * compact.
+   */
+  String MIN_COMPACTION_INPUT_FILES = "min_input_files";
+  int MIN_COMPACTION_INPUT_FILES_DEFAULT = 1;
+
+  /**
+   * The parallelism level to use when processing chunks generated by the Compaction Strategy. The structure and
+   * contents of the chunk is determined by the Compaction Strategy and options passed to it. When running each job will
+   * be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return this for chaining
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
+   *
+   * @param strategyName name of the strategy
+   * @return this for chaining
+   */
+  CompactDataFiles compactionStrategy(String strategyName);

Review comment:
       `compactionStrategy` -> `strategy`?

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files.
+   */
+  String TARGET_FILE_SIZE = "target_file_size";
+
+  /**
+   * A threshold for preventing compaction of partitions whose output will not more than MIN_COMPACTION_OUTPUT_FILES
+   * files.
+   */
+  String MIN_COMPACTION_OUTPUT_FILES = "min_output_files";
+  int MIN_COMPACTION_OUTPUT_FILES_DEFAULT = 1;
+
+  /**
+   * A threshold for preventing compaction of partitions which do not have more than MIN_COMPACTION_INPUT_FILES to
+   * compact.
+   */
+  String MIN_COMPACTION_INPUT_FILES = "min_input_files";
+  int MIN_COMPACTION_INPUT_FILES_DEFAULT = 1;
+
+  /**
+   * The parallelism level to use when processing chunks generated by the Compaction Strategy. The structure and
+   * contents of the chunk is determined by the Compaction Strategy and options passed to it. When running each job will
+   * be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return this for chaining
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
+   *
+   * @param strategyName name of the strategy
+   * @return this for chaining
+   */
+  CompactDataFiles compactionStrategy(String strategyName);
+
+  /**
+   * A user provided filter for determining which files will be considered by the Compaction strategy. This will be used
+   * in addition to whatever rules the Compaction strategy generates. For example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
+   *
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
+   */
+  CompactDataFiles filter(Expression expression);
+
+  /**
+   * A pairing of Chunk information to the result of that chunk's results. If the results are null then that particular
+   * chunk failed. We should only have failed chunks if partial progress is enabled.
+   */
+  class Result {
+    private Map<CompactionChunkInfo, CompactionChunkResult> resultMap;
+
+
+    public Result(Map<CompactionChunkInfo, CompactionChunkResult> resultMap) {
+      this.resultMap = resultMap;
+    }
+
+    public Map<CompactionChunkInfo, CompactionChunkResult> resultMap() {

Review comment:
       `chunkResults` or something?

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files.
+   */
+  String TARGET_FILE_SIZE = "target_file_size";
+
+  /**
+   * A threshold for preventing compaction of partitions whose output will not more than MIN_COMPACTION_OUTPUT_FILES
+   * files.
+   */
+  String MIN_COMPACTION_OUTPUT_FILES = "min_output_files";
+  int MIN_COMPACTION_OUTPUT_FILES_DEFAULT = 1;
+
+  /**
+   * A threshold for preventing compaction of partitions which do not have more than MIN_COMPACTION_INPUT_FILES to
+   * compact.
+   */
+  String MIN_COMPACTION_INPUT_FILES = "min_input_files";
+  int MIN_COMPACTION_INPUT_FILES_DEFAULT = 1;
+
+  /**
+   * The parallelism level to use when processing chunks generated by the Compaction Strategy. The structure and
+   * contents of the chunk is determined by the Compaction Strategy and options passed to it. When running each job will
+   * be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return this for chaining
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
+   *
+   * @param strategyName name of the strategy
+   * @return this for chaining
+   */
+  CompactDataFiles compactionStrategy(String strategyName);
+
+  /**
+   * A user provided filter for determining which files will be considered by the Compaction strategy. This will be used
+   * in addition to whatever rules the Compaction strategy generates. For example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
+   *
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
+   */
+  CompactDataFiles filter(Expression expression);
+
+  /**
+   * A pairing of Chunk information to the result of that chunk's results. If the results are null then that particular
+   * chunk failed. We should only have failed chunks if partial progress is enabled.
+   */
+  class Result {
+    private Map<CompactionChunkInfo, CompactionChunkResult> resultMap;
+
+
+    public Result(Map<CompactionChunkInfo, CompactionChunkResult> resultMap) {
+      this.resultMap = resultMap;
+    }
+
+    public Map<CompactionChunkInfo, CompactionChunkResult> resultMap() {
+      return resultMap;
+    }
+  }
+
+  interface CompactionChunkResult {
+
+    int numFilesAdded();
+
+    int numFilesRemoved();
+  }
+
+  interface CompactionChunkInfo {
+
+    /**
+     * returns which chunk this is out of the total set of chunks for this compaction
+     */
+    int chunkIndex();
+
+    /**
+     * returns which chunk this is out of the set of chunks for this partition
+     */
+    int partitionIndex();
+
+    /**
+     * returns which partition this chunk contains files from
+     */
+    String partition();

Review comment:
       Should it be `StructLike` instead? Or how will we derive the string?

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files.
+   */
+  String TARGET_FILE_SIZE = "target_file_size";
+
+  /**
+   * A threshold for preventing compaction of partitions whose output will not more than MIN_COMPACTION_OUTPUT_FILES
+   * files.
+   */
+  String MIN_COMPACTION_OUTPUT_FILES = "min_output_files";
+  int MIN_COMPACTION_OUTPUT_FILES_DEFAULT = 1;
+
+  /**
+   * A threshold for preventing compaction of partitions which do not have more than MIN_COMPACTION_INPUT_FILES to
+   * compact.
+   */
+  String MIN_COMPACTION_INPUT_FILES = "min_input_files";
+  int MIN_COMPACTION_INPUT_FILES_DEFAULT = 1;
+
+  /**
+   * The parallelism level to use when processing chunks generated by the Compaction Strategy. The structure and
+   * contents of the chunk is determined by the Compaction Strategy and options passed to it. When running each job will
+   * be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return this for chaining
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
+   *
+   * @param strategyName name of the strategy
+   * @return this for chaining
+   */
+  CompactDataFiles compactionStrategy(String strategyName);
+
+  /**
+   * A user provided filter for determining which files will be considered by the Compaction strategy. This will be used
+   * in addition to whatever rules the Compaction strategy generates. For example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
+   *
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
+   */
+  CompactDataFiles filter(Expression expression);
+
+  /**
+   * A pairing of Chunk information to the result of that chunk's results. If the results are null then that particular
+   * chunk failed. We should only have failed chunks if partial progress is enabled.
+   */
+  class Result {
+    private Map<CompactionChunkInfo, CompactionChunkResult> resultMap;
+
+

Review comment:
       nit: extra line

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files.
+   */
+  String TARGET_FILE_SIZE = "target_file_size";
+
+  /**
+   * A threshold for preventing compaction of partitions whose output will not more than MIN_COMPACTION_OUTPUT_FILES
+   * files.
+   */
+  String MIN_COMPACTION_OUTPUT_FILES = "min_output_files";
+  int MIN_COMPACTION_OUTPUT_FILES_DEFAULT = 1;
+
+  /**
+   * A threshold for preventing compaction of partitions which do not have more than MIN_COMPACTION_INPUT_FILES to
+   * compact.
+   */
+  String MIN_COMPACTION_INPUT_FILES = "min_input_files";
+  int MIN_COMPACTION_INPUT_FILES_DEFAULT = 1;
+
+  /**
+   * The parallelism level to use when processing chunks generated by the Compaction Strategy. The structure and
+   * contents of the chunk is determined by the Compaction Strategy and options passed to it. When running each job will
+   * be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return this for chaining
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
+   *
+   * @param strategyName name of the strategy
+   * @return this for chaining
+   */
+  CompactDataFiles compactionStrategy(String strategyName);
+
+  /**
+   * A user provided filter for determining which files will be considered by the Compaction strategy. This will be used
+   * in addition to whatever rules the Compaction strategy generates. For example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
+   *
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
+   */
+  CompactDataFiles filter(Expression expression);
+
+  /**
+   * A pairing of Chunk information to the result of that chunk's results. If the results are null then that particular
+   * chunk failed. We should only have failed chunks if partial progress is enabled.
+   */
+  class Result {
+    private Map<CompactionChunkInfo, CompactionChunkResult> resultMap;
+
+
+    public Result(Map<CompactionChunkInfo, CompactionChunkResult> resultMap) {
+      this.resultMap = resultMap;
+    }
+
+    public Map<CompactionChunkInfo, CompactionChunkResult> resultMap() {
+      return resultMap;
+    }
+  }
+
+  interface CompactionChunkResult {
+
+    int numFilesAdded();
+
+    int numFilesRemoved();

Review comment:
       I think we use `xxxCount` frequently. Shall it be `rewrittenDataFilesCount`?

##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+
+public interface DataCompactionStrategy extends Serializable {

Review comment:
       Do we want to make it public in the first iteration? We could make it non-public in `core` to start with.

##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/BinPack.java
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+public interface BinPack {

Review comment:
       Would it make sense to add this one once we have a bit more clarity on the implementation? Right now, it is only used in the default compaction strategy constant. We can probably add it later and focus on the common API for now.
   
   Just asking.

##########
File path: api/src/main/java/org/apache/iceberg/actions/CompactDataFiles.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import org.apache.iceberg.actions.compaction.BinPack;
+import org.apache.iceberg.expressions.Expression;
+
+public interface CompactDataFiles extends Action<CompactDataFiles, CompactDataFiles.Result> {
+
+  /**
+   * Enable committing groups of chunks prior to the entire compaction completing. This will produce additional commits
+   * but allow for progress even if some chunks fail to commit. The default is false, which produces a single commit
+   * when all chunks have completed.
+   */
+  String PARTIAL_PROGRESS_ENABLED = "partial_progress.enabled";
+  boolean PARTIAL_PROGRESS_ENABLED_DEFAULT = false;
+
+  /**
+   * The maximum amount of commits that compaction is allowed to produce if partial progress is enabled.
+   */
+  String PARTIAL_PROGRESS_MAX_COMMITS = "partial_progress.max_commits";
+  int PARTIAL_PROGRESS_MAX_COMMITS_DEFAULT = 10;
+
+  String COMPACTION_STRATEGY_DEFAULT = BinPack.NAME;
+
+  /**
+   * The largest amount of data that should be compacted in a single chunk by the underlying framework. This bounds the
+   * amount of data that would be used in a single shuffle for example.
+   */
+  String MAX_CHUNK_SIZE_BYTES = "max_chunk_size_bytes";
+  long MAX_CHUNK_SIZE_DEFAULT = 1024L * 1024L * 1024L * 100L; // 100 Gigabytes
+
+  /**
+   * The file size that this compaction strategy will attempt to generate when rewriting files.
+   */
+  String TARGET_FILE_SIZE = "target_file_size";
+
+  /**
+   * A threshold for preventing compaction of partitions whose output will not more than MIN_COMPACTION_OUTPUT_FILES
+   * files.
+   */
+  String MIN_COMPACTION_OUTPUT_FILES = "min_output_files";
+  int MIN_COMPACTION_OUTPUT_FILES_DEFAULT = 1;
+
+  /**
+   * A threshold for preventing compaction of partitions which do not have more than MIN_COMPACTION_INPUT_FILES to
+   * compact.
+   */
+  String MIN_COMPACTION_INPUT_FILES = "min_input_files";
+  int MIN_COMPACTION_INPUT_FILES_DEFAULT = 1;
+
+  /**
+   * The parallelism level to use when processing chunks generated by the Compaction Strategy. The structure and
+   * contents of the chunk is determined by the Compaction Strategy and options passed to it. When running each job will
+   * be run independently and asynchronously.
+   *
+   * @param par number of concurrent jobs
+   * @return this for chaining
+   */
+  CompactDataFiles parallelism(int par);
+
+  /**
+   * The name of the compaction strategy to be used when compacting data files. Currently we only support BINPACK and
+   * SORT as options.
+   *
+   * @param strategyName name of the strategy
+   * @return this for chaining
+   */
+  CompactDataFiles compactionStrategy(String strategyName);
+
+  /**
+   * A user provided filter for determining which files will be considered by the Compaction strategy. This will be used
+   * in addition to whatever rules the Compaction strategy generates. For example this would be used for providing a
+   * restriction to only run compaction on a specific partition.
+   *
+   * @param expression only entries that pass this filter will be compacted
+   * @return this for chaining
+   */
+  CompactDataFiles filter(Expression expression);
+
+  /**
+   * A pairing of Chunk information to the result of that chunk's results. If the results are null then that particular
+   * chunk failed. We should only have failed chunks if partial progress is enabled.
+   */
+  class Result {
+    private Map<CompactionChunkInfo, CompactionChunkResult> resultMap;
+
+
+    public Result(Map<CompactionChunkInfo, CompactionChunkResult> resultMap) {
+      this.resultMap = resultMap;
+    }
+
+    public Map<CompactionChunkInfo, CompactionChunkResult> resultMap() {
+      return resultMap;
+    }
+  }
+
+  interface CompactionChunkResult {
+
+    int numFilesAdded();

Review comment:
       `addedDataFilesCount`?

##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+
+public interface DataCompactionStrategy extends Serializable {
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  DataCompactionStrategy withOptions(Map<String, String> options);
+
+  /**
+   * Before the compaction strategy rules are applied, the underlying action has the ability to use this expression to
+   * filter the FileScanTasks which will be created when planning file reads that will later be run through this
+   * compaction strategy. This would be used for pushing down filter expressions to the manifest entry scanning phase.
+   *
+   * @return an Iceberg expression to use when discovering file scan tasks
+   */
+  Expression preFilter();
+
+  /**
+   * Removes all file references which this plan will not rewrite or change. Unlike the preFilter, this method can
+   * execute arbitrary code and isn't restricted to just Iceberg Expressions. This should be serializable so that
+   * Actions which run remotely can utilize the method.
+   *
+   * @param dataFiles iterator of live datafiles in a given partition
+   * @return iterator containing only files to be rewritten
+   */
+  Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles);

Review comment:
       Do we think working with `Iterator` would be easier than with `Iterable`? Just out of curiosity.

##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+
+public interface DataCompactionStrategy extends Serializable {
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  DataCompactionStrategy withOptions(Map<String, String> options);

Review comment:
       Do we need a method in the action to pass the options? I assume the user won't be instantiating the strategy for now.

##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+
+public interface DataCompactionStrategy extends Serializable {
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  DataCompactionStrategy withOptions(Map<String, String> options);
+
+  /**
+   * Before the compaction strategy rules are applied, the underlying action has the ability to use this expression to
+   * filter the FileScanTasks which will be created when planning file reads that will later be run through this
+   * compaction strategy. This would be used for pushing down filter expressions to the manifest entry scanning phase.
+   *
+   * @return an Iceberg expression to use when discovering file scan tasks
+   */
+  Expression preFilter();
+
+  /**
+   * Removes all file references which this plan will not rewrite or change. Unlike the preFilter, this method can
+   * execute arbitrary code and isn't restricted to just Iceberg Expressions. This should be serializable so that
+   * Actions which run remotely can utilize the method.
+   *
+   * @param dataFiles iterator of live datafiles in a given partition
+   * @return iterator containing only files to be rewritten
+   */
+  Iterator<FileScanTask> filesToCompact(Iterator<FileScanTask> dataFiles);
+
+  /**
+   * Groups file scans into lists which will be processed in a single executable unit. Each group will end up being
+   * committed as an independent set of changes. This creates the jobs which will eventually be run as by the underlying
+   * Action.
+   *
+   * @param dataFiles iterator of files to be rewritten
+   * @return iterator of sets of files to be processed together
+   */
+  Iterator<List<FileScanTask>> groupFilesIntoChunks(Iterator<FileScanTask> dataFiles);
+
+  /**
+   * Method which will rewrite files based on this particular DataCompactionStrategy's Algorithm.
+   * This will most likely be Action framework specific.
+   *
+   * @param table          table being modified
+   * @param filesToRewrite a group of files to be rewritten together
+   * @return a list of newly written files
+   */
+  List<DataFile> rewriteFiles(Table table, List<FileScanTask> filesToRewrite);

Review comment:
       Question: `RewriteFiles` needs a set. Shall we return a set here to avoid any conversions later?

##########
File path: api/src/main/java/org/apache/iceberg/actions/compaction/DataCompactionStrategy.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions.compaction;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.expressions.Expression;
+
+public interface DataCompactionStrategy extends Serializable {
+  /**
+   * Returns the name of this compaction strategy
+   */
+  String name();
+
+  /**
+   * Returns a set of options which this compaction strategy can use. This is an allowed-list and any options not
+   * specified here will be rejected at runtime.
+   */
+  Set<String> validOptions();
+
+  DataCompactionStrategy withOptions(Map<String, String> options);
+
+  /**
+   * Before the compaction strategy rules are applied, the underlying action has the ability to use this expression to
+   * filter the FileScanTasks which will be created when planning file reads that will later be run through this
+   * compaction strategy. This would be used for pushing down filter expressions to the manifest entry scanning phase.
+   *
+   * @return an Iceberg expression to use when discovering file scan tasks
+   */
+  Expression preFilter();

Review comment:
       Can this be only a data filter?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org