You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/04/13 18:16:32 UTC

[iceberg] branch master updated: Core, Spark 3.3: Add FileRewriter API (#7175)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f53c4eeec5 Core, Spark 3.3: Add FileRewriter API (#7175)
f53c4eeec5 is described below

commit f53c4eeec5dc187957264bf2f7a6675caf9054e2
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu Apr 13 11:16:25 2023 -0700

    Core, Spark 3.3: Add FileRewriter API (#7175)
---
 .../apache/iceberg/actions/BinPackStrategy.java    |   3 +
 .../org/apache/iceberg/actions/FileRewriter.java   |  77 ++++
 .../apache/iceberg/actions/RewriteStrategy.java    |   6 +
 .../iceberg/actions/SizeBasedDataRewriter.java     | 109 ++++++
 .../iceberg/actions/SizeBasedFileRewriter.java     | 301 ++++++++++++++++
 .../org/apache/iceberg/actions/SortStrategy.java   |   3 +
 .../extensions/TestRewriteDataFilesProcedure.java  |   6 +-
 .../spark/actions/RewriteDataFilesSparkAction.java |  66 ++--
 .../spark/actions/SparkBinPackDataRewriter.java    |  82 +++++
 .../spark/actions/SparkBinPackStrategy.java        |   6 +
 .../spark/actions/SparkShufflingDataRewriter.java  | 138 +++++++
 .../spark/actions/SparkSizeBasedDataRewriter.java  |  67 ++++
 .../spark/actions/SparkSortDataRewriter.java       |  60 ++++
 .../iceberg/spark/actions/SparkSortStrategy.java   |   6 +
 .../spark/actions/SparkZOrderDataRewriter.java     | 184 ++++++++++
 .../iceberg/spark/actions/SparkZOrderStrategy.java |   6 +
 .../spark/actions/TestRewriteDataFilesAction.java  |  12 +-
 .../spark/actions/TestSparkFileRewriter.java       | 396 +++++++++++++++++++++
 18 files changed, 1476 insertions(+), 52 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
index 785f5c3ea3..8e0c0b01dd 100644
--- a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
+++ b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java
@@ -44,7 +44,10 @@ import org.slf4j.LoggerFactory;
  * RewriteDataFiles#MAX_FILE_GROUP_SIZE_BYTES}. Groups will be considered for rewriting if they
  * contain more files than {@link #MIN_INPUT_FILES} or would produce at least one file of {@link
  * RewriteDataFiles#TARGET_FILE_SIZE_BYTES}.
+ *
+ * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SizeBasedFileRewriter} instead.
  */
+@Deprecated
 public abstract class BinPackStrategy implements RewriteStrategy {
 
   private static final Logger LOG = LoggerFactory.getLogger(BinPackStrategy.class);
diff --git a/core/src/main/java/org/apache/iceberg/actions/FileRewriter.java b/core/src/main/java/org/apache/iceberg/actions/FileRewriter.java
new file mode 100644
index 0000000000..7c6b4e8d7e
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/FileRewriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+
+/**
+ * A class for rewriting content files.
+ *
+ * <p>The entire rewrite operation is broken down into pieces based on partitioning, and size-based
+ * groups within a partition. These subunits of the rewrite are referred to as file groups. A file
+ * group will be processed by a single framework "action". For example, in Spark this means that
+ * each group would be rewritten in its own Spark job.
+ *
+ * @param <T> the Java type of tasks to read content files
+ * @param <F> the Java type of content files
+ */
+public interface FileRewriter<T extends ContentScanTask<F>, F extends ContentFile<F>> {
+
+  /** Returns a description for this rewriter. */
+  default String description() {
+    return getClass().getName();
+  }
+
+  /**
+   * Returns a set of supported options for this rewriter. Only options specified in this list will
+   * be accepted at runtime. Any other options will be rejected.
+   */
+  Set<String> validOptions();
+
+  /**
+   * Initializes this rewriter using provided options.
+   *
+   * @param options options to initialize this rewriter
+   */
+  void init(Map<String, String> options);
+
+  /**
+   * Selects files which this rewriter believes are valid targets to be rewritten based on their
+   * scan tasks and groups those scan tasks into file groups. The file groups are then rewritten in
+   * a single executable unit, such as a Spark job.
+   *
+   * @param tasks an iterable of scan task for files in a partition
+   * @return groups of scan tasks for files to be rewritten in a single executable unit
+   */
+  Iterable<List<T>> planFileGroups(Iterable<T> tasks);
+
+  /**
+   * Rewrite a group of files represented by the given list of scan tasks.
+   *
+   * <p>The implementation is supposed to be engine-specific (e.g. Spark, Flink, Trino).
+   *
+   * @param group a group of scan tasks for files to be rewritten together
+   * @return a set of newly written files
+   */
+  Set<F> rewrite(List<T> group);
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java b/core/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java
index 36fc724752..d3a450ddfb 100644
--- a/core/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java
+++ b/core/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java
@@ -26,6 +26,12 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Table;
 
+/**
+ * A strategy for rewriting files.
+ *
+ * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link FileRewriter} instead.
+ */
+@Deprecated
 public interface RewriteStrategy extends Serializable {
   /** Returns the name of this rewrite strategy */
   String name();
diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java
new file mode 100644
index 0000000000..e5b5908804
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.PropertyUtil;
+
+public abstract class SizeBasedDataRewriter extends SizeBasedFileRewriter<FileScanTask, DataFile> {
+
+  /**
+   * The minimum number of deletes that needs to be associated with a data file for it to be
+   * considered for rewriting. If a data file has this number of deletes or more, it will be
+   * rewritten regardless of its file size determined by {@link #MIN_FILE_SIZE_BYTES} and {@link
+   * #MAX_FILE_SIZE_BYTES}. If a file group contains a file that satisfies this condition, the file
+   * group will be rewritten regardless of the number of files in the file group determined by
+   * {@link #MIN_INPUT_FILES}.
+   *
+   * <p>Defaults to Integer.MAX_VALUE, which means this feature is not enabled by default.
+   */
+  public static final String DELETE_FILE_THRESHOLD = "delete-file-threshold";
+
+  public static final int DELETE_FILE_THRESHOLD_DEFAULT = Integer.MAX_VALUE;
+
+  private int deleteFileThreshold;
+
+  protected SizeBasedDataRewriter(Table table) {
+    super(table);
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(DELETE_FILE_THRESHOLD)
+        .build();
+  }
+
+  @Override
+  public void init(Map<String, String> options) {
+    super.init(options);
+    this.deleteFileThreshold = deleteFileThreshold(options);
+  }
+
+  @Override
+  protected Iterable<FileScanTask> filterFiles(Iterable<FileScanTask> tasks) {
+    return Iterables.filter(tasks, task -> wronglySized(task) || tooManyDeletes(task));
+  }
+
+  private boolean tooManyDeletes(FileScanTask task) {
+    return task.deletes() != null && task.deletes().size() >= deleteFileThreshold;
+  }
+
+  @Override
+  protected Iterable<List<FileScanTask>> filterFileGroups(List<List<FileScanTask>> groups) {
+    return Iterables.filter(groups, this::shouldRewrite);
+  }
+
+  private boolean shouldRewrite(List<FileScanTask> group) {
+    return enoughInputFiles(group)
+        || enoughContent(group)
+        || tooMuchContent(group)
+        || anyTaskHasTooManyDeletes(group);
+  }
+
+  private boolean anyTaskHasTooManyDeletes(List<FileScanTask> group) {
+    return group.stream().anyMatch(this::tooManyDeletes);
+  }
+
+  @Override
+  protected long defaultTargetFileSize() {
+    return PropertyUtil.propertyAsLong(
+        table().properties(),
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+        TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+  }
+
+  private int deleteFileThreshold(Map<String, String> options) {
+    int value =
+        PropertyUtil.propertyAsInt(options, DELETE_FILE_THRESHOLD, DELETE_FILE_THRESHOLD_DEFAULT);
+    Preconditions.checkArgument(
+        value >= 0, "'%s' is set to %s but must be >= 0", DELETE_FILE_THRESHOLD, value);
+    return value;
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
new file mode 100644
index 0000000000..6c6880eff6
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.util.BinPacking;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A file rewriter that determines which files to rewrite based on their size.
+ *
+ * <p>If files are smaller than the {@link #MIN_FILE_SIZE_BYTES} threshold or larger than the {@link
+ * #MAX_FILE_SIZE_BYTES} threshold, they are considered targets for being rewritten.
+ *
+ * <p>Once selected, files are grouped based on the {@link BinPacking bin-packing algorithm} into
+ * groups of no more than {@link #MAX_FILE_GROUP_SIZE_BYTES}. Groups will be actually rewritten if
+ * they contain more than {@link #MIN_INPUT_FILES} or if they would produce at least one file of
+ * {@link #TARGET_FILE_SIZE_BYTES}.
+ *
+ * <p>Note that implementations may add extra conditions for selecting files or filtering groups.
+ */
+public abstract class SizeBasedFileRewriter<T extends ContentScanTask<F>, F extends ContentFile<F>>
+    implements FileRewriter<T, F> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SizeBasedFileRewriter.class);
+
+  /** The target output file size that this file rewriter will attempt to generate. */
+  public static final String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+  /**
+   * Controls which files will be considered for rewriting. Files with sizes under this threshold
+   * will be considered for rewriting regardless of any other criteria.
+   *
+   * <p>Defaults to 75% of the target file size.
+   */
+  public static final String MIN_FILE_SIZE_BYTES = "min-file-size-bytes";
+
+  public static final double MIN_FILE_SIZE_DEFAULT_RATIO = 0.75;
+
+  /**
+   * Controls which files will be considered for rewriting. Files with sizes above this threshold
+   * will be considered for rewriting regardless of any other criteria.
+   *
+   * <p>Defaults to 180% of the target file size.
+   */
+  public static final String MAX_FILE_SIZE_BYTES = "max-file-size-bytes";
+
+  public static final double MAX_FILE_SIZE_DEFAULT_RATIO = 1.80;
+
+  /**
+   * Any file group exceeding this number of files will be rewritten regardless of other criteria.
+   * This config ensures file groups that contain many files are compacted even if the total size of
+   * that group is less than the target file size. This can also be thought of as the maximum number
+   * of wrongly sized files that could remain in a partition after rewriting.
+   */
+  public static final String MIN_INPUT_FILES = "min-input-files";
+
+  public static final int MIN_INPUT_FILES_DEFAULT = 5;
+
+  /** Overrides other options and forces rewriting of all provided files. */
+  public static final String REWRITE_ALL = "rewrite-all";
+
+  public static final boolean REWRITE_ALL_DEFAULT = false;
+
+  /**
+   * This option controls the largest amount of data that should be rewritten in a single file
+   * group. It helps with breaking down the rewriting of very large partitions which may not be
+   * rewritable otherwise due to the resource constraints of the cluster. For example, a sort-based
+   * rewrite may not scale to TB-sized partitions, and those partitions need to be worked on in
+   * small subsections to avoid exhaustion of resources.
+   */
+  public static final String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+
+  public static final long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 100L * 1024 * 1024 * 1024; // 100 GB
+
+  private final Table table;
+  private long targetFileSize;
+  private long minFileSize;
+  private long maxFileSize;
+  private int minInputFiles;
+  private boolean rewriteAll;
+  private long maxGroupSize;
+
+  protected SizeBasedFileRewriter(Table table) {
+    this.table = table;
+  }
+
+  protected abstract long defaultTargetFileSize();
+
+  protected abstract Iterable<T> filterFiles(Iterable<T> tasks);
+
+  protected abstract Iterable<List<T>> filterFileGroups(List<List<T>> groups);
+
+  protected Table table() {
+    return table;
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.of(
+        TARGET_FILE_SIZE_BYTES,
+        MIN_FILE_SIZE_BYTES,
+        MAX_FILE_SIZE_BYTES,
+        MIN_INPUT_FILES,
+        REWRITE_ALL,
+        MAX_FILE_GROUP_SIZE_BYTES);
+  }
+
+  @Override
+  public void init(Map<String, String> options) {
+    Map<String, Long> sizeThresholds = sizeThresholds(options);
+    this.targetFileSize = sizeThresholds.get(TARGET_FILE_SIZE_BYTES);
+    this.minFileSize = sizeThresholds.get(MIN_FILE_SIZE_BYTES);
+    this.maxFileSize = sizeThresholds.get(MAX_FILE_SIZE_BYTES);
+
+    this.minInputFiles = minInputFiles(options);
+    this.rewriteAll = rewriteAll(options);
+    this.maxGroupSize = maxGroupSize(options);
+
+    if (rewriteAll) {
+      LOG.info("Configured to rewrite all provided files in table {}", table.name());
+    }
+  }
+
+  protected boolean wronglySized(T task) {
+    return task.length() < minFileSize || task.length() > maxFileSize;
+  }
+
+  @Override
+  public Iterable<List<T>> planFileGroups(Iterable<T> tasks) {
+    Iterable<T> filteredTasks = rewriteAll ? tasks : filterFiles(tasks);
+    BinPacking.ListPacker<T> packer = new BinPacking.ListPacker<>(maxGroupSize, 1, false);
+    List<List<T>> groups = packer.pack(filteredTasks, ContentScanTask::length);
+    return rewriteAll ? groups : filterFileGroups(groups);
+  }
+
+  protected boolean enoughInputFiles(List<T> group) {
+    return group.size() > 1 && group.size() >= minInputFiles;
+  }
+
+  protected boolean enoughContent(List<T> group) {
+    return group.size() > 1 && inputSize(group) > targetFileSize;
+  }
+
+  protected boolean tooMuchContent(List<T> group) {
+    return inputSize(group) > maxFileSize;
+  }
+
+  protected long inputSize(List<T> group) {
+    return group.stream().mapToLong(ContentScanTask::length).sum();
+  }
+
+  /**
+   * Determines the preferable number of output files when rewriting a particular file group.
+   *
+   * <p>If the rewriter is handling 10.1 GB of data with a target file size of 1 GB, it could
+   * produce 11 files, one of which would only have 0.1 GB. This would most likely be less
+   * preferable to 10 files with 1.01 GB each. So this method decides whether to round up or round
+   * down based on what the estimated average file size will be if the remainder (0.1 GB) is
+   * distributed amongst other files. If the new average file size is no more than 10% greater than
+   * the target file size, then this method will round down when determining the number of output
+   * files. Otherwise, the remainder will be written into a separate file.
+   *
+   * @param inputSize a total input size for a file group
+   * @return the number of files this rewriter should create
+   */
+  protected long numOutputFiles(long inputSize) {
+    if (inputSize < targetFileSize) {
+      return 1;
+    }
+
+    long numFilesWithRemainder = LongMath.divide(inputSize, targetFileSize, RoundingMode.CEILING);
+    long numFilesWithoutRemainder = LongMath.divide(inputSize, targetFileSize, RoundingMode.FLOOR);
+    long avgFileSizeWithoutRemainder = inputSize / numFilesWithoutRemainder;
+
+    if (LongMath.mod(inputSize, targetFileSize) > minFileSize) {
+      // the remainder file is of a valid size for this rewrite so keep it
+      return numFilesWithRemainder;
+
+    } else if (avgFileSizeWithoutRemainder < Math.min(1.1 * targetFileSize, writeMaxFileSize())) {
+      // if the reminder is distributed amongst other files,
+      // the average file size will be no more than 10% bigger than the target file size
+      // so round down and distribute remainder amongst other files
+      return numFilesWithoutRemainder;
+
+    } else {
+      // keep the remainder file as it is not OK to distribute it amongst other files
+      return numFilesWithRemainder;
+    }
+  }
+
+  /**
+   * Estimates a larger max target file size than the target size used in task creation to avoid
+   * creating tiny remainder files.
+   *
+   * <p>While we create tasks that should all be smaller than our target size, there is a chance
+   * that the actual data will end up being larger than our target size due to various factors of
+   * compression, serialization, which are outside our control. If this occurs, instead of making a
+   * single file that is close in size to our target, we would end up producing one file of the
+   * target size, and then a small extra file with the remaining data.
+   *
+   * <p>For example, if our target is 512 MB, we may generate a rewrite task that should be 500 MB.
+   * When we write the data we may find we actually have to write out 530 MB. If we use the target
+   * size while writing, we would produce a 512 MB file and an 18 MB file. If instead we use a
+   * larger size estimated by this method, then we end up writing a single file.
+   *
+   * @return the target size plus one half of the distance between max and target
+   */
+  protected long writeMaxFileSize() {
+    return (long) (targetFileSize + ((maxFileSize - targetFileSize) * 0.5));
+  }
+
+  private Map<String, Long> sizeThresholds(Map<String, String> options) {
+    long target =
+        PropertyUtil.propertyAsLong(options, TARGET_FILE_SIZE_BYTES, defaultTargetFileSize());
+
+    long defaultMin = (long) (target * MIN_FILE_SIZE_DEFAULT_RATIO);
+    long min = PropertyUtil.propertyAsLong(options, MIN_FILE_SIZE_BYTES, defaultMin);
+
+    long defaultMax = (long) (target * MAX_FILE_SIZE_DEFAULT_RATIO);
+    long max = PropertyUtil.propertyAsLong(options, MAX_FILE_SIZE_BYTES, defaultMax);
+
+    Preconditions.checkArgument(
+        target > 0, "'%s' is set to %s but must be > 0", TARGET_FILE_SIZE_BYTES, target);
+
+    Preconditions.checkArgument(
+        min >= 0, "'%s' is set to %s but must be >= 0", MIN_FILE_SIZE_BYTES, min);
+
+    Preconditions.checkArgument(
+        target > min,
+        "'%s' (%s) must be > '%s' (%s), all new files will be smaller than the min threshold",
+        TARGET_FILE_SIZE_BYTES,
+        target,
+        MIN_FILE_SIZE_BYTES,
+        min);
+
+    Preconditions.checkArgument(
+        target < max,
+        "'%s' (%s) must be < '%s' (%s), all new files will be larger than the max threshold",
+        TARGET_FILE_SIZE_BYTES,
+        target,
+        MAX_FILE_SIZE_BYTES,
+        max);
+
+    Map<String, Long> values = Maps.newHashMap();
+
+    values.put(TARGET_FILE_SIZE_BYTES, target);
+    values.put(MIN_FILE_SIZE_BYTES, min);
+    values.put(MAX_FILE_SIZE_BYTES, max);
+
+    return values;
+  }
+
+  private int minInputFiles(Map<String, String> options) {
+    int value = PropertyUtil.propertyAsInt(options, MIN_INPUT_FILES, MIN_INPUT_FILES_DEFAULT);
+    Preconditions.checkArgument(
+        value > 0, "'%s' is set to %s but must be > 0", MIN_INPUT_FILES, value);
+    return value;
+  }
+
+  private long maxGroupSize(Map<String, String> options) {
+    long value =
+        PropertyUtil.propertyAsLong(
+            options, MAX_FILE_GROUP_SIZE_BYTES, MAX_FILE_GROUP_SIZE_BYTES_DEFAULT);
+    Preconditions.checkArgument(
+        value > 0, "'%s' is set to %s but must be > 0", MAX_FILE_GROUP_SIZE_BYTES, value);
+    return value;
+  }
+
+  private boolean rewriteAll(Map<String, String> options) {
+    return PropertyUtil.propertyAsBoolean(options, REWRITE_ALL, REWRITE_ALL_DEFAULT);
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java b/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java
index d08f0940f5..59decb8020 100644
--- a/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java
+++ b/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java
@@ -37,7 +37,10 @@ import org.apache.iceberg.util.SortOrderUtil;
  * would be chosen by {@link BinPackStrategy} will be rewrite candidates.
  *
  * <p>In the future other algorithms for determining files to rewrite will be provided.
+ *
+ * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SizeBasedFileRewriter} instead.
  */
+@Deprecated
 public abstract class SortStrategy extends BinPackStrategy {
 
   private SortOrder sortOrder;
diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index d13e0967b6..44aca898b6 100644
--- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -390,7 +390,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
     AssertHelpers.assertThrows(
         "Should reject calls with error message",
         IllegalArgumentException.class,
-        "Cannot set strategy to sort, it has already been set",
+        "Must use only one rewriter type (bin-pack, sort, zorder)",
         () ->
             sql(
                 "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'binpack', "
@@ -401,7 +401,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
     AssertHelpers.assertThrows(
         "Should reject calls with error message",
         IllegalArgumentException.class,
-        "Can't use SORT when there is no sort order",
+        "Cannot sort data without a valid sort order",
         () ->
             sql(
                 "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort')",
@@ -455,7 +455,7 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
     AssertHelpers.assertThrows(
         "Should reject calls with error message",
         IllegalArgumentException.class,
-        "Cannot find column 'col1' in table schema: "
+        "Cannot find column 'col1' in table schema (case sensitive = false): "
             + "struct<1: c1: optional int, 2: c2: optional string, 3: c3: optional string>",
         () ->
             sql(
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
index e3db8fe9dc..5f95ef3ed4 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.actions;
 
 import java.io.IOException;
 import java.math.RoundingMode;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
@@ -37,13 +38,11 @@ import org.apache.iceberg.RewriteJobOrder;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.actions.BinPackStrategy;
+import org.apache.iceberg.actions.FileRewriter;
 import org.apache.iceberg.actions.ImmutableRewriteDataFiles;
 import org.apache.iceberg.actions.RewriteDataFiles;
 import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
 import org.apache.iceberg.actions.RewriteFileGroup;
-import org.apache.iceberg.actions.RewriteStrategy;
-import org.apache.iceberg.actions.SortStrategy;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.ValidationException;
@@ -92,7 +91,7 @@ public class RewriteDataFilesSparkAction
   private boolean partialProgressEnabled;
   private boolean useStartingSequenceNumber;
   private RewriteJobOrder rewriteJobOrder;
-  private RewriteStrategy strategy = null;
+  private FileRewriter<FileScanTask, DataFile> rewriter = null;
 
   RewriteDataFilesSparkAction(SparkSession spark, Table table) {
     super(spark.cloneSession());
@@ -109,40 +108,32 @@ public class RewriteDataFilesSparkAction
   @Override
   public RewriteDataFilesSparkAction binPack() {
     Preconditions.checkArgument(
-        this.strategy == null,
-        "Cannot set strategy to binpack, it has already been set",
-        this.strategy);
-    this.strategy = binPackStrategy();
+        rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)");
+    this.rewriter = new SparkBinPackDataRewriter(spark(), table);
     return this;
   }
 
   @Override
   public RewriteDataFilesSparkAction sort(SortOrder sortOrder) {
     Preconditions.checkArgument(
-        this.strategy == null,
-        "Cannot set strategy to sort, it has already been set to %s",
-        this.strategy);
-    this.strategy = sortStrategy().sortOrder(sortOrder);
+        rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)");
+    this.rewriter = new SparkSortDataRewriter(spark(), table, sortOrder);
     return this;
   }
 
   @Override
   public RewriteDataFilesSparkAction sort() {
     Preconditions.checkArgument(
-        this.strategy == null,
-        "Cannot set strategy to sort, it has already been set to %s",
-        this.strategy);
-    this.strategy = sortStrategy();
+        rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)");
+    this.rewriter = new SparkSortDataRewriter(spark(), table);
     return this;
   }
 
   @Override
   public RewriteDataFilesSparkAction zOrder(String... columnNames) {
     Preconditions.checkArgument(
-        this.strategy == null,
-        "Cannot set strategy to zorder, it has already been set to %s",
-        this.strategy);
-    this.strategy = zOrderStrategy(columnNames);
+        rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)");
+    this.rewriter = new SparkZOrderDataRewriter(spark(), table, Arrays.asList(columnNames));
     return this;
   }
 
@@ -161,8 +152,8 @@ public class RewriteDataFilesSparkAction
     long startingSnapshotId = table.currentSnapshot().snapshotId();
 
     // Default to BinPack if no strategy selected
-    if (this.strategy == null) {
-      this.strategy = binPackStrategy();
+    if (this.rewriter == null) {
+      this.rewriter = new SparkBinPackDataRewriter(spark(), table);
     }
 
     validateAndInitOptions();
@@ -226,9 +217,8 @@ public class RewriteDataFilesSparkAction
 
       filesByPartition.forEach(
           (partition, tasks) -> {
-            Iterable<FileScanTask> filtered = strategy.selectFilesToRewrite(tasks);
-            Iterable<List<FileScanTask>> groupedTasks = strategy.planFileGroups(filtered);
-            List<List<FileScanTask>> fileGroups = ImmutableList.copyOf(groupedTasks);
+            Iterable<List<FileScanTask>> plannedFileGroups = rewriter.planFileGroups(tasks);
+            List<List<FileScanTask>> fileGroups = ImmutableList.copyOf(plannedFileGroups);
             if (fileGroups.size() > 0) {
               fileGroupsByPartition.put(partition, fileGroups);
             }
@@ -250,7 +240,7 @@ public class RewriteDataFilesSparkAction
     Set<DataFile> addedFiles =
         withJobGroupInfo(
             newJobGroupInfo("REWRITE-DATA-FILES", desc),
-            () -> strategy.rewriteFiles(fileGroup.fileScans()));
+            () -> rewriter.rewrite(fileGroup.fileScans()));
 
     fileGroup.setOutputFiles(addedFiles);
     LOG.info("Rewrite Files Ready to be Committed - {}", desc);
@@ -418,7 +408,7 @@ public class RewriteDataFilesSparkAction
   }
 
   void validateAndInitOptions() {
-    Set<String> validOptions = Sets.newHashSet(strategy.validOptions());
+    Set<String> validOptions = Sets.newHashSet(rewriter.validOptions());
     validOptions.addAll(VALID_OPTIONS);
 
     Set<String> invalidKeys = Sets.newHashSet(options().keySet());
@@ -426,11 +416,11 @@ public class RewriteDataFilesSparkAction
 
     Preconditions.checkArgument(
         invalidKeys.isEmpty(),
-        "Cannot use options %s, they are not supported by the action or the strategy %s",
+        "Cannot use options %s, they are not supported by the action or the rewriter %s",
         invalidKeys,
-        strategy.name());
+        rewriter.description());
 
-    strategy = strategy.options(options());
+    rewriter.init(options());
 
     maxConcurrentFileGroupRewrites =
         PropertyUtil.propertyAsInt(
@@ -474,7 +464,7 @@ public class RewriteDataFilesSparkAction
       return String.format(
           "Rewriting %d files (%s, file group %d/%d, %s (%d/%d)) in %s",
           group.rewrittenFiles().size(),
-          strategy.name(),
+          rewriter.description(),
           group.info().globalIndex(),
           ctx.totalGroupCount(),
           partition,
@@ -485,25 +475,13 @@ public class RewriteDataFilesSparkAction
       return String.format(
           "Rewriting %d files (%s, file group %d/%d) in %s",
           group.rewrittenFiles().size(),
-          strategy.name(),
+          rewriter.description(),
           group.info().globalIndex(),
           ctx.totalGroupCount(),
           table.name());
     }
   }
 
-  private BinPackStrategy binPackStrategy() {
-    return new SparkBinPackStrategy(table, spark());
-  }
-
-  private SortStrategy sortStrategy() {
-    return new SparkSortStrategy(table, spark());
-  }
-
-  private SortStrategy zOrderStrategy(String... columnNames) {
-    return new SparkZOrderStrategy(table, spark(), Lists.newArrayList(columnNames));
-  }
-
   @VisibleForTesting
   static class RewriteExecutionContext {
     private final Map<StructLike, Integer> numGroupsByPartition;
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
new file mode 100644
index 0000000000..21e94ef9b4
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.util.List;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+class SparkBinPackDataRewriter extends SparkSizeBasedDataRewriter {
+
+  private static final long SPLIT_OVERHEAD = 5 * 1024;
+
+  SparkBinPackDataRewriter(SparkSession spark, Table table) {
+    super(spark, table);
+  }
+
+  @Override
+  public String description() {
+    return "BIN-PACK";
+  }
+
+  @Override
+  protected void doRewrite(String groupId, List<FileScanTask> group) {
+    // read the files packing them into splits of the required size
+    Dataset<Row> scanDF =
+        spark()
+            .read()
+            .format("iceberg")
+            .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId)
+            .option(SparkReadOptions.SPLIT_SIZE, splitSize(inputSize(group)))
+            .option(SparkReadOptions.FILE_OPEN_COST, "0")
+            .load(groupId);
+
+    // write the packed data into new files where each split becomes a new file
+    scanDF
+        .write()
+        .format("iceberg")
+        .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
+        .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+        .option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode(group).modeName())
+        .mode("append")
+        .save(groupId);
+  }
+
+  // invoke a shuffle if the original spec does not match the output spec
+  private DistributionMode distributionMode(List<FileScanTask> group) {
+    boolean requiresRepartition = !group.get(0).spec().equals(table().spec());
+    return requiresRepartition ? DistributionMode.RANGE : DistributionMode.NONE;
+  }
+
+  /**
+   * Returns the smallest of our max write file threshold and our estimated split size based on the
+   * number of output files we want to generate. Add an overhead onto the estimated split size to
+   * try to avoid small errors in size creating brand-new files.
+   */
+  private long splitSize(long inputSize) {
+    long estimatedSplitSize = (inputSize / numOutputFiles(inputSize)) + SPLIT_OVERHEAD;
+    return Math.min(estimatedSplitSize, writeMaxFileSize());
+  }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
index 46aefd20af..07d3210ead 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java
@@ -35,6 +35,12 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 
+/**
+ * A Spark strategy to bin-pack data.
+ *
+ * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SparkBinPackDataRewriter} instead.
+ */
+@Deprecated
 public class SparkBinPackStrategy extends BinPackStrategy {
   private final Table table;
   private final SparkSession spark;
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
new file mode 100644
index 0000000000..1add6383c6
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SortOrderUtil;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.distributions.Distributions;
+import org.apache.spark.sql.connector.distributions.OrderedDistribution;
+import org.apache.spark.sql.connector.expressions.SortOrder;
+import org.apache.spark.sql.internal.SQLConf;
+
+abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
+
+  /**
+   * The number of shuffle partitions and consequently the number of output files created by the
+   * Spark sort is based on the size of the input data files used in this file rewriter. Due to
+   * compression, the disk file sizes may not accurately represent the size of files in the output.
+   * This parameter lets the user adjust the file size used for estimating actual output data size.
+   * A factor greater than 1.0 would generate more files than we would expect based on the on-disk
+   * file size. A value less than 1.0 would create fewer files than we would expect based on the
+   * on-disk size.
+   */
+  public static final String COMPRESSION_FACTOR = "compression-factor";
+
+  public static final double COMPRESSION_FACTOR_DEFAULT = 1.0;
+
+  private double compressionFactor;
+
+  protected SparkShufflingDataRewriter(SparkSession spark, Table table) {
+    super(spark, table);
+  }
+
+  protected abstract Dataset<Row> sortedDF(Dataset<Row> df, List<FileScanTask> group);
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(COMPRESSION_FACTOR)
+        .build();
+  }
+
+  @Override
+  public void init(Map<String, String> options) {
+    super.init(options);
+    this.compressionFactor = compressionFactor(options);
+  }
+
+  @Override
+  public void doRewrite(String groupId, List<FileScanTask> group) {
+    // the number of shuffle partition controls the number of output files
+    spark().conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), numShufflePartitions(group));
+
+    Dataset<Row> scanDF =
+        spark()
+            .read()
+            .format("iceberg")
+            .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId)
+            .load(groupId);
+
+    Dataset<Row> sortedDF = sortedDF(scanDF, group);
+
+    sortedDF
+        .write()
+        .format("iceberg")
+        .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId)
+        .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize())
+        .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+        .mode("append")
+        .save(groupId);
+  }
+
+  protected Dataset<Row> sort(Dataset<Row> df, org.apache.iceberg.SortOrder sortOrder) {
+    SortOrder[] ordering = SparkDistributionAndOrderingUtil.convert(sortOrder);
+    OrderedDistribution distribution = Distributions.ordered(ordering);
+    SQLConf conf = spark().sessionState().conf();
+    LogicalPlan plan = df.logicalPlan();
+    LogicalPlan sortPlan =
+        DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, ordering, plan, conf);
+    return new Dataset<>(spark(), sortPlan, df.encoder());
+  }
+
+  protected org.apache.iceberg.SortOrder outputSortOrder(
+      List<FileScanTask> group, org.apache.iceberg.SortOrder sortOrder) {
+    boolean includePartitionColumns = !group.get(0).spec().equals(table().spec());
+    if (includePartitionColumns) {
+      // build in the requirement for partition sorting into our sort order
+      // as the original spec for this group does not match the output spec
+      return SortOrderUtil.buildSortOrder(table(), sortOrder);
+    } else {
+      return sortOrder;
+    }
+  }
+
+  private long numShufflePartitions(List<FileScanTask> group) {
+    long numOutputFiles = numOutputFiles((long) (inputSize(group) * compressionFactor));
+    return Math.max(1, numOutputFiles);
+  }
+
+  private double compressionFactor(Map<String, String> options) {
+    double value =
+        PropertyUtil.propertyAsDouble(options, COMPRESSION_FACTOR, COMPRESSION_FACTOR_DEFAULT);
+    Preconditions.checkArgument(
+        value > 0, "'%s' is set to %s but must be > 0", COMPRESSION_FACTOR, value);
+    return value;
+  }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSizeBasedDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSizeBasedDataRewriter.java
new file mode 100644
index 0000000000..d40cbbb871
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSizeBasedDataRewriter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.SizeBasedDataRewriter;
+import org.apache.iceberg.spark.FileRewriteCoordinator;
+import org.apache.iceberg.spark.ScanTaskSetManager;
+import org.apache.iceberg.spark.SparkTableCache;
+import org.apache.spark.sql.SparkSession;
+
+abstract class SparkSizeBasedDataRewriter extends SizeBasedDataRewriter {
+
+  private final SparkSession spark;
+  private final SparkTableCache tableCache = SparkTableCache.get();
+  private final ScanTaskSetManager taskSetManager = ScanTaskSetManager.get();
+  private final FileRewriteCoordinator coordinator = FileRewriteCoordinator.get();
+
+  SparkSizeBasedDataRewriter(SparkSession spark, Table table) {
+    super(table);
+    this.spark = spark;
+  }
+
+  protected abstract void doRewrite(String groupId, List<FileScanTask> group);
+
+  protected SparkSession spark() {
+    return spark;
+  }
+
+  @Override
+  public Set<DataFile> rewrite(List<FileScanTask> group) {
+    String groupId = UUID.randomUUID().toString();
+    try {
+      tableCache.add(groupId, table());
+      taskSetManager.stageTasks(table(), groupId, group);
+
+      doRewrite(groupId, group);
+
+      return coordinator.fetchNewDataFiles(table(), groupId);
+    } finally {
+      tableCache.remove(groupId);
+      taskSetManager.removeTasks(table(), groupId);
+      coordinator.clearRewrite(table(), groupId);
+    }
+  }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java
new file mode 100644
index 0000000000..4615f3cebc
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.util.List;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+class SparkSortDataRewriter extends SparkShufflingDataRewriter {
+
+  private final SortOrder sortOrder;
+
+  SparkSortDataRewriter(SparkSession spark, Table table) {
+    super(spark, table);
+    Preconditions.checkArgument(
+        table.sortOrder().isSorted(),
+        "Cannot sort data without a valid sort order, table '%s' is unsorted and no sort order is provided",
+        table.name());
+    this.sortOrder = table.sortOrder();
+  }
+
+  SparkSortDataRewriter(SparkSession spark, Table table, SortOrder sortOrder) {
+    super(spark, table);
+    Preconditions.checkArgument(
+        sortOrder != null && sortOrder.isSorted(),
+        "Cannot sort data without a valid sort order, the provided sort order is null or empty");
+    this.sortOrder = sortOrder;
+  }
+
+  @Override
+  public String description() {
+    return "SORT";
+  }
+
+  @Override
+  protected Dataset<Row> sortedDF(Dataset<Row> df, List<FileScanTask> group) {
+    return sort(df, outputSortOrder(group, sortOrder));
+  }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
index 59aafc595a..21e29263c9 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java
@@ -47,6 +47,12 @@ import org.apache.spark.sql.connector.distributions.Distributions;
 import org.apache.spark.sql.connector.expressions.SortOrder;
 import org.apache.spark.sql.internal.SQLConf;
 
+/**
+ * A Spark strategy to sort data.
+ *
+ * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SparkSortDataRewriter} instead.
+ */
+@Deprecated
 public class SparkSortStrategy extends SortStrategy {
 
   /**
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
new file mode 100644
index 0000000000..68db76d37f
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import static org.apache.spark.sql.functions.array;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.NullOrder;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.ZOrderByteUtils;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SparkZOrderDataRewriter extends SparkShufflingDataRewriter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkZOrderDataRewriter.class);
+
+  private static final String Z_COLUMN = "ICEZVALUE";
+  private static final Schema Z_SCHEMA =
+      new Schema(Types.NestedField.required(0, Z_COLUMN, Types.BinaryType.get()));
+  private static final SortOrder Z_SORT_ORDER =
+      SortOrder.builderFor(Z_SCHEMA)
+          .sortBy(Z_COLUMN, SortDirection.ASC, NullOrder.NULLS_LAST)
+          .build();
+
+  /**
+   * Controls the amount of bytes interleaved in the ZOrder algorithm. Default is all bytes being
+   * interleaved.
+   */
+  public static final String MAX_OUTPUT_SIZE = "max-output-size";
+
+  public static final int MAX_OUTPUT_SIZE_DEFAULT = Integer.MAX_VALUE;
+
+  /**
+   * Controls the number of bytes considered from an input column of a type with variable length
+   * (String, Binary).
+   *
+   * <p>Default is to use the same size as primitives {@link ZOrderByteUtils#PRIMITIVE_BUFFER_SIZE}.
+   */
+  public static final String VAR_LENGTH_CONTRIBUTION = "var-length-contribution";
+
+  public static final int VAR_LENGTH_CONTRIBUTION_DEFAULT = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE;
+
+  private final List<String> zOrderColNames;
+  private int maxOutputSize;
+  private int varLengthContribution;
+
+  SparkZOrderDataRewriter(SparkSession spark, Table table, List<String> zOrderColNames) {
+    super(spark, table);
+    this.zOrderColNames = validZOrderColNames(spark, table, zOrderColNames);
+  }
+
+  @Override
+  public String description() {
+    return "Z-ORDER";
+  }
+
+  @Override
+  public Set<String> validOptions() {
+    return ImmutableSet.<String>builder()
+        .addAll(super.validOptions())
+        .add(MAX_OUTPUT_SIZE)
+        .add(VAR_LENGTH_CONTRIBUTION)
+        .build();
+  }
+
+  @Override
+  public void init(Map<String, String> options) {
+    super.init(options);
+    this.maxOutputSize = maxOutputSize(options);
+    this.varLengthContribution = varLengthContribution(options);
+  }
+
+  @Override
+  protected Dataset<Row> sortedDF(Dataset<Row> df, List<FileScanTask> group) {
+    Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zValue(df));
+    Dataset<Row> sortedDF = sort(zValueDF, outputSortOrder(group, Z_SORT_ORDER));
+    return sortedDF.drop(Z_COLUMN);
+  }
+
+  private Column zValue(Dataset<Row> df) {
+    SparkZOrderUDF zOrderUDF =
+        new SparkZOrderUDF(zOrderColNames.size(), varLengthContribution, maxOutputSize);
+
+    Column[] zOrderCols =
+        zOrderColNames.stream()
+            .map(df.schema()::apply)
+            .map(col -> zOrderUDF.sortedLexicographically(df.col(col.name()), col.dataType()))
+            .toArray(Column[]::new);
+
+    return zOrderUDF.interleaveBytes(array(zOrderCols));
+  }
+
+  private int varLengthContribution(Map<String, String> options) {
+    int value =
+        PropertyUtil.propertyAsInt(
+            options, VAR_LENGTH_CONTRIBUTION, VAR_LENGTH_CONTRIBUTION_DEFAULT);
+    Preconditions.checkArgument(
+        value > 0,
+        "Cannot use less than 1 byte for variable length types with ZOrder, '%s' was set to %s",
+        VAR_LENGTH_CONTRIBUTION,
+        value);
+    return value;
+  }
+
+  private int maxOutputSize(Map<String, String> options) {
+    int value = PropertyUtil.propertyAsInt(options, MAX_OUTPUT_SIZE, MAX_OUTPUT_SIZE_DEFAULT);
+    Preconditions.checkArgument(
+        value > 0,
+        "Cannot have the interleaved ZOrder value use less than 1 byte, '%s' was set to %s",
+        MAX_OUTPUT_SIZE,
+        value);
+    return value;
+  }
+
+  private List<String> validZOrderColNames(
+      SparkSession spark, Table table, List<String> inputZOrderColNames) {
+
+    Preconditions.checkArgument(
+        inputZOrderColNames != null && !inputZOrderColNames.isEmpty(),
+        "Cannot ZOrder when no columns are specified");
+
+    Schema schema = table.schema();
+    Set<Integer> identityPartitionFieldIds = table.spec().identitySourceIds();
+    boolean caseSensitive = SparkUtil.caseSensitive(spark);
+
+    List<String> validZOrderColNames = Lists.newArrayList();
+
+    for (String colName : inputZOrderColNames) {
+      Types.NestedField field =
+          caseSensitive ? schema.findField(colName) : schema.caseInsensitiveFindField(colName);
+      Preconditions.checkArgument(
+          field != null,
+          "Cannot find column '%s' in table schema (case sensitive = %s): %s",
+          colName,
+          caseSensitive,
+          schema.asStruct());
+
+      if (identityPartitionFieldIds.contains(field.fieldId())) {
+        LOG.warn("Ignoring '{}' as such values are constant within a partition", colName);
+      } else {
+        validZOrderColNames.add(colName);
+      }
+    }
+
+    Preconditions.checkArgument(
+        validZOrderColNames.size() > 0,
+        "Cannot ZOrder, all columns provided were identity partition columns and cannot be used");
+
+    return validZOrderColNames;
+  }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
index fe04c1f4f1..26d2b4837b 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java
@@ -58,6 +58,12 @@ import org.apache.spark.sql.types.StructField;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * A Spark strategy to zOrder data.
+ *
+ * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SparkZOrderDataRewriter} instead.
+ */
+@Deprecated
 public class SparkZOrderStrategy extends SparkSortStrategy {
   private static final Logger LOG = LoggerFactory.getLogger(SparkZOrderStrategy.class);
 
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index bb062a5ba4..761284bb56 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -1232,23 +1232,25 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
   public void testInvalidAPIUsage() {
     Table table = createTable(1);
 
+    SortOrder sortOrder = SortOrder.builderFor(table.schema()).asc("c2").build();
+
     AssertHelpers.assertThrows(
         "Should be unable to set Strategy more than once",
         IllegalArgumentException.class,
-        "Cannot set strategy",
+        "Must use only one rewriter type",
         () -> actions().rewriteDataFiles(table).binPack().sort());
 
     AssertHelpers.assertThrows(
         "Should be unable to set Strategy more than once",
         IllegalArgumentException.class,
-        "Cannot set strategy",
-        () -> actions().rewriteDataFiles(table).sort().binPack());
+        "Must use only one rewriter type",
+        () -> actions().rewriteDataFiles(table).sort(sortOrder).binPack());
 
     AssertHelpers.assertThrows(
         "Should be unable to set Strategy more than once",
         IllegalArgumentException.class,
-        "Cannot set strategy",
-        () -> actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack());
+        "Must use only one rewriter type",
+        () -> actions().rewriteDataFiles(table).sort(sortOrder).binPack());
   }
 
   @Test
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java
new file mode 100644
index 0000000000..6800ffd404
--- /dev/null
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.actions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MockFileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.SizeBasedDataRewriter;
+import org.apache.iceberg.actions.SizeBasedFileRewriter;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.types.Types.IntegerType;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StringType;
+import org.assertj.core.api.Assertions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSparkFileRewriter extends SparkTestBase {
+
+  private static final TableIdentifier TABLE_IDENT = TableIdentifier.of("default", "tbl");
+  private static final Schema SCHEMA =
+      new Schema(
+          NestedField.required(1, "id", IntegerType.get()),
+          NestedField.required(2, "dep", StringType.get()));
+  private static final PartitionSpec SPEC =
+      PartitionSpec.builderFor(SCHEMA).identity("dep").build();
+  private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build();
+
+  @After
+  public void removeTable() {
+    catalog.dropTable(TABLE_IDENT);
+  }
+
+  @Test
+  public void testBinPackDataSelectFiles() {
+    Table table = catalog.createTable(TABLE_IDENT, SCHEMA);
+    SparkBinPackDataRewriter rewriter = new SparkBinPackDataRewriter(spark, table);
+
+    checkDataFileSizeFiltering(rewriter);
+    checkDataFilesDeleteThreshold(rewriter);
+    checkDataFileGroupWithEnoughFiles(rewriter);
+    checkDataFileGroupWithEnoughData(rewriter);
+    checkDataFileGroupWithTooMuchData(rewriter);
+  }
+
+  @Test
+  public void testSortDataSelectFiles() {
+    Table table = catalog.createTable(TABLE_IDENT, SCHEMA);
+    SparkSortDataRewriter rewriter = new SparkSortDataRewriter(spark, table, SORT_ORDER);
+
+    checkDataFileSizeFiltering(rewriter);
+    checkDataFilesDeleteThreshold(rewriter);
+    checkDataFileGroupWithEnoughFiles(rewriter);
+    checkDataFileGroupWithEnoughData(rewriter);
+    checkDataFileGroupWithTooMuchData(rewriter);
+  }
+
+  @Test
+  public void testZOrderDataSelectFiles() {
+    Table table = catalog.createTable(TABLE_IDENT, SCHEMA);
+    ImmutableList<String> zOrderCols = ImmutableList.of("id");
+    SparkZOrderDataRewriter rewriter = new SparkZOrderDataRewriter(spark, table, zOrderCols);
+
+    checkDataFileSizeFiltering(rewriter);
+    checkDataFilesDeleteThreshold(rewriter);
+    checkDataFileGroupWithEnoughFiles(rewriter);
+    checkDataFileGroupWithEnoughData(rewriter);
+    checkDataFileGroupWithTooMuchData(rewriter);
+  }
+
+  private void checkDataFileSizeFiltering(SizeBasedDataRewriter rewriter) {
+    FileScanTask tooSmallTask = new MockFileScanTask(100L);
+    FileScanTask optimal = new MockFileScanTask(450);
+    FileScanTask tooBigTask = new MockFileScanTask(1000L);
+    List<FileScanTask> tasks = ImmutableList.of(tooSmallTask, optimal, tooBigTask);
+
+    Map<String, String> options =
+        ImmutableMap.of(
+            SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, "250",
+            SizeBasedDataRewriter.TARGET_FILE_SIZE_BYTES, "500",
+            SizeBasedDataRewriter.MAX_FILE_SIZE_BYTES, "750",
+            SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, String.valueOf(Integer.MAX_VALUE));
+    rewriter.init(options);
+
+    Iterable<List<FileScanTask>> groups = rewriter.planFileGroups(tasks);
+    Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups));
+    List<FileScanTask> group = Iterables.getOnlyElement(groups);
+    Assert.assertEquals("Must rewrite 2 files", 2, group.size());
+  }
+
+  private void checkDataFilesDeleteThreshold(SizeBasedDataRewriter rewriter) {
+    FileScanTask tooManyDeletesTask = MockFileScanTask.mockTaskWithDeletes(1000L, 3);
+    FileScanTask optimalTask = MockFileScanTask.mockTaskWithDeletes(1000L, 1);
+    List<FileScanTask> tasks = ImmutableList.of(tooManyDeletesTask, optimalTask);
+
+    Map<String, String> options =
+        ImmutableMap.of(
+            SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, "1",
+            SizeBasedDataRewriter.TARGET_FILE_SIZE_BYTES, "2000",
+            SizeBasedDataRewriter.MAX_FILE_SIZE_BYTES, "5000",
+            SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "2");
+    rewriter.init(options);
+
+    Iterable<List<FileScanTask>> groups = rewriter.planFileGroups(tasks);
+    Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups));
+    List<FileScanTask> group = Iterables.getOnlyElement(groups);
+    Assert.assertEquals("Must rewrite 1 file", 1, group.size());
+  }
+
+  private void checkDataFileGroupWithEnoughFiles(SizeBasedDataRewriter rewriter) {
+    List<FileScanTask> tasks =
+        ImmutableList.of(
+            new MockFileScanTask(100L),
+            new MockFileScanTask(100L),
+            new MockFileScanTask(100L),
+            new MockFileScanTask(100L));
+
+    Map<String, String> options =
+        ImmutableMap.of(
+            SizeBasedDataRewriter.MIN_INPUT_FILES, "3",
+            SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, "150",
+            SizeBasedDataRewriter.TARGET_FILE_SIZE_BYTES, "1000",
+            SizeBasedDataRewriter.MAX_FILE_SIZE_BYTES, "5000",
+            SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, String.valueOf(Integer.MAX_VALUE));
+    rewriter.init(options);
+
+    Iterable<List<FileScanTask>> groups = rewriter.planFileGroups(tasks);
+    Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups));
+    List<FileScanTask> group = Iterables.getOnlyElement(groups);
+    Assert.assertEquals("Must rewrite 4 files", 4, group.size());
+  }
+
+  private void checkDataFileGroupWithEnoughData(SizeBasedDataRewriter rewriter) {
+    List<FileScanTask> tasks =
+        ImmutableList.of(
+            new MockFileScanTask(100L), new MockFileScanTask(100L), new MockFileScanTask(100L));
+
+    Map<String, String> options =
+        ImmutableMap.of(
+            SizeBasedDataRewriter.MIN_INPUT_FILES, "5",
+            SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, "200",
+            SizeBasedDataRewriter.TARGET_FILE_SIZE_BYTES, "250",
+            SizeBasedDataRewriter.MAX_FILE_SIZE_BYTES, "500",
+            SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, String.valueOf(Integer.MAX_VALUE));
+    rewriter.init(options);
+
+    Iterable<List<FileScanTask>> groups = rewriter.planFileGroups(tasks);
+    Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups));
+    List<FileScanTask> group = Iterables.getOnlyElement(groups);
+    Assert.assertEquals("Must rewrite 3 files", 3, group.size());
+  }
+
+  private void checkDataFileGroupWithTooMuchData(SizeBasedDataRewriter rewriter) {
+    List<FileScanTask> tasks = ImmutableList.of(new MockFileScanTask(2000L));
+
+    Map<String, String> options =
+        ImmutableMap.of(
+            SizeBasedDataRewriter.MIN_INPUT_FILES, "5",
+            SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, "200",
+            SizeBasedDataRewriter.TARGET_FILE_SIZE_BYTES, "250",
+            SizeBasedDataRewriter.MAX_FILE_SIZE_BYTES, "500",
+            SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, String.valueOf(Integer.MAX_VALUE));
+    rewriter.init(options);
+
+    Iterable<List<FileScanTask>> groups = rewriter.planFileGroups(tasks);
+    Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups));
+    List<FileScanTask> group = Iterables.getOnlyElement(groups);
+    Assert.assertEquals("Must rewrite big file", 1, group.size());
+  }
+
+  @Test
+  public void testInvalidConstructorUsagesSortData() {
+    Table table = catalog.createTable(TABLE_IDENT, SCHEMA);
+
+    Assertions.assertThatThrownBy(() -> new SparkSortDataRewriter(spark, table))
+        .hasMessageContaining("Cannot sort data without a valid sort order")
+        .hasMessageContaining("is unsorted and no sort order is provided");
+
+    Assertions.assertThatThrownBy(() -> new SparkSortDataRewriter(spark, table, null))
+        .hasMessageContaining("Cannot sort data without a valid sort order")
+        .hasMessageContaining("the provided sort order is null or empty");
+
+    Assertions.assertThatThrownBy(
+            () -> new SparkSortDataRewriter(spark, table, SortOrder.unsorted()))
+        .hasMessageContaining("Cannot sort data without a valid sort order")
+        .hasMessageContaining("the provided sort order is null or empty");
+  }
+
+  @Test
+  public void testInvalidConstructorUsagesZOrderData() {
+    Table table = catalog.createTable(TABLE_IDENT, SCHEMA, SPEC);
+
+    Assertions.assertThatThrownBy(() -> new SparkZOrderDataRewriter(spark, table, null))
+        .hasMessageContaining("Cannot ZOrder when no columns are specified");
+
+    Assertions.assertThatThrownBy(
+            () -> new SparkZOrderDataRewriter(spark, table, ImmutableList.of()))
+        .hasMessageContaining("Cannot ZOrder when no columns are specified");
+
+    Assertions.assertThatThrownBy(
+            () -> new SparkZOrderDataRewriter(spark, table, ImmutableList.of("dep")))
+        .hasMessageContaining("Cannot ZOrder")
+        .hasMessageContaining("all columns provided were identity partition columns");
+
+    Assertions.assertThatThrownBy(
+            () -> new SparkZOrderDataRewriter(spark, table, ImmutableList.of("DeP")))
+        .hasMessageContaining("Cannot ZOrder")
+        .hasMessageContaining("all columns provided were identity partition columns");
+  }
+
+  @Test
+  public void testBinPackDataValidOptions() {
+    Table table = catalog.createTable(TABLE_IDENT, SCHEMA);
+    SparkBinPackDataRewriter rewriter = new SparkBinPackDataRewriter(spark, table);
+
+    Assert.assertEquals(
+        "Rewriter must report all supported options",
+        ImmutableSet.of(
+            SparkBinPackDataRewriter.TARGET_FILE_SIZE_BYTES,
+            SparkBinPackDataRewriter.MIN_FILE_SIZE_BYTES,
+            SparkBinPackDataRewriter.MAX_FILE_SIZE_BYTES,
+            SparkBinPackDataRewriter.MIN_INPUT_FILES,
+            SparkBinPackDataRewriter.REWRITE_ALL,
+            SparkBinPackDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
+            SparkBinPackDataRewriter.DELETE_FILE_THRESHOLD),
+        rewriter.validOptions());
+  }
+
+  @Test
+  public void testSortDataValidOptions() {
+    Table table = catalog.createTable(TABLE_IDENT, SCHEMA);
+    SparkSortDataRewriter rewriter = new SparkSortDataRewriter(spark, table, SORT_ORDER);
+
+    Assert.assertEquals(
+        "Rewriter must report all supported options",
+        ImmutableSet.of(
+            SparkSortDataRewriter.TARGET_FILE_SIZE_BYTES,
+            SparkSortDataRewriter.MIN_FILE_SIZE_BYTES,
+            SparkSortDataRewriter.MAX_FILE_SIZE_BYTES,
+            SparkSortDataRewriter.MIN_INPUT_FILES,
+            SparkSortDataRewriter.REWRITE_ALL,
+            SparkSortDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
+            SparkSortDataRewriter.DELETE_FILE_THRESHOLD,
+            SparkSortDataRewriter.COMPRESSION_FACTOR),
+        rewriter.validOptions());
+  }
+
+  @Test
+  public void testZOrderDataValidOptions() {
+    Table table = catalog.createTable(TABLE_IDENT, SCHEMA);
+    ImmutableList<String> zOrderCols = ImmutableList.of("id");
+    SparkZOrderDataRewriter rewriter = new SparkZOrderDataRewriter(spark, table, zOrderCols);
+
+    Assert.assertEquals(
+        "Rewriter must report all supported options",
+        ImmutableSet.of(
+            SparkZOrderDataRewriter.TARGET_FILE_SIZE_BYTES,
+            SparkZOrderDataRewriter.MIN_FILE_SIZE_BYTES,
+            SparkZOrderDataRewriter.MAX_FILE_SIZE_BYTES,
+            SparkZOrderDataRewriter.MIN_INPUT_FILES,
+            SparkZOrderDataRewriter.REWRITE_ALL,
+            SparkZOrderDataRewriter.MAX_FILE_GROUP_SIZE_BYTES,
+            SparkZOrderDataRewriter.DELETE_FILE_THRESHOLD,
+            SparkZOrderDataRewriter.COMPRESSION_FACTOR,
+            SparkZOrderDataRewriter.MAX_OUTPUT_SIZE,
+            SparkZOrderDataRewriter.VAR_LENGTH_CONTRIBUTION),
+        rewriter.validOptions());
+  }
+
+  @Test
+  public void testInvalidValuesForBinPackDataOptions() {
+    Table table = catalog.createTable(TABLE_IDENT, SCHEMA);
+    SparkBinPackDataRewriter rewriter = new SparkBinPackDataRewriter(spark, table);
+
+    validateSizeBasedRewriterOptions(rewriter);
+
+    Map<String, String> invalidDeleteThresholdOptions =
+        ImmutableMap.of(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "-1");
+    Assertions.assertThatThrownBy(() -> rewriter.init(invalidDeleteThresholdOptions))
+        .hasMessageContaining("'delete-file-threshold' is set to -1 but must be >= 0");
+  }
+
+  @Test
+  public void testInvalidValuesForSortDataOptions() {
+    Table table = catalog.createTable(TABLE_IDENT, SCHEMA);
+    SparkSortDataRewriter rewriter = new SparkSortDataRewriter(spark, table, SORT_ORDER);
+
+    validateSizeBasedRewriterOptions(rewriter);
+
+    Map<String, String> invalidDeleteThresholdOptions =
+        ImmutableMap.of(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "-1");
+    Assertions.assertThatThrownBy(() -> rewriter.init(invalidDeleteThresholdOptions))
+        .hasMessageContaining("'delete-file-threshold' is set to -1 but must be >= 0");
+
+    Map<String, String> invalidCompressionFactorOptions =
+        ImmutableMap.of(SparkShufflingDataRewriter.COMPRESSION_FACTOR, "0");
+    Assertions.assertThatThrownBy(() -> rewriter.init(invalidCompressionFactorOptions))
+        .hasMessageContaining("'compression-factor' is set to 0.0 but must be > 0");
+  }
+
+  @Test
+  public void testInvalidValuesForZOrderDataOptions() {
+    Table table = catalog.createTable(TABLE_IDENT, SCHEMA);
+    ImmutableList<String> zOrderCols = ImmutableList.of("id");
+    SparkZOrderDataRewriter rewriter = new SparkZOrderDataRewriter(spark, table, zOrderCols);
+
+    validateSizeBasedRewriterOptions(rewriter);
+
+    Map<String, String> invalidDeleteThresholdOptions =
+        ImmutableMap.of(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "-1");
+    Assertions.assertThatThrownBy(() -> rewriter.init(invalidDeleteThresholdOptions))
+        .hasMessageContaining("'delete-file-threshold' is set to -1 but must be >= 0");
+
+    Map<String, String> invalidCompressionFactorOptions =
+        ImmutableMap.of(SparkShufflingDataRewriter.COMPRESSION_FACTOR, "0");
+    Assertions.assertThatThrownBy(() -> rewriter.init(invalidCompressionFactorOptions))
+        .hasMessageContaining("'compression-factor' is set to 0.0 but must be > 0");
+
+    Map<String, String> invalidMaxOutputOptions =
+        ImmutableMap.of(SparkZOrderDataRewriter.MAX_OUTPUT_SIZE, "0");
+    Assertions.assertThatThrownBy(() -> rewriter.init(invalidMaxOutputOptions))
+        .hasMessageContaining("Cannot have the interleaved ZOrder value use less than 1 byte")
+        .hasMessageContaining("'max-output-size' was set to 0");
+
+    Map<String, String> invalidVarLengthContributionOptions =
+        ImmutableMap.of(SparkZOrderDataRewriter.VAR_LENGTH_CONTRIBUTION, "0");
+    Assertions.assertThatThrownBy(() -> rewriter.init(invalidVarLengthContributionOptions))
+        .hasMessageContaining("Cannot use less than 1 byte for variable length types with ZOrder")
+        .hasMessageContaining("'var-length-contribution' was set to 0");
+  }
+
+  private void validateSizeBasedRewriterOptions(SizeBasedFileRewriter<?, ?> rewriter) {
+    Map<String, String> invalidTargetSizeOptions =
+        ImmutableMap.of(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, "0");
+    Assertions.assertThatThrownBy(() -> rewriter.init(invalidTargetSizeOptions))
+        .hasMessageContaining("'target-file-size-bytes' is set to 0 but must be > 0");
+
+    Map<String, String> invalidMinSizeOptions =
+        ImmutableMap.of(SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, "-1");
+    Assertions.assertThatThrownBy(() -> rewriter.init(invalidMinSizeOptions))
+        .hasMessageContaining("'min-file-size-bytes' is set to -1 but must be >= 0");
+
+    Map<String, String> invalidTargetMinSizeOptions =
+        ImmutableMap.of(
+            SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, "3",
+            SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, "5");
+    Assertions.assertThatThrownBy(() -> rewriter.init(invalidTargetMinSizeOptions))
+        .hasMessageContaining("'target-file-size-bytes' (3) must be > 'min-file-size-bytes' (5)")
+        .hasMessageContaining("all new files will be smaller than the min threshold");
+
+    Map<String, String> invalidTargetMaxSizeOptions =
+        ImmutableMap.of(
+            SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, "5",
+            SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES, "3");
+    Assertions.assertThatThrownBy(() -> rewriter.init(invalidTargetMaxSizeOptions))
+        .hasMessageContaining("'target-file-size-bytes' (5) must be < 'max-file-size-bytes' (3)")
+        .hasMessageContaining("all new files will be larger than the max threshold");
+
+    Map<String, String> invalidMinInputFilesOptions =
+        ImmutableMap.of(SizeBasedFileRewriter.MIN_INPUT_FILES, "0");
+    Assertions.assertThatThrownBy(() -> rewriter.init(invalidMinInputFilesOptions))
+        .hasMessageContaining("'min-input-files' is set to 0 but must be > 0");
+
+    Map<String, String> invalidMaxFileGroupSizeOptions =
+        ImmutableMap.of(SizeBasedFileRewriter.MAX_FILE_GROUP_SIZE_BYTES, "0");
+    Assertions.assertThatThrownBy(() -> rewriter.init(invalidMaxFileGroupSizeOptions))
+        .hasMessageContaining("'max-file-group-size-bytes' is set to 0 but must be > 0");
+  }
+}