You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/04/27 21:00:49 UTC

[iceberg] branch master updated: Spark 3.4: Remove no longer needed write extensions (#7443)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 91327e7c57 Spark 3.4: Remove no longer needed write extensions (#7443)
91327e7c57 is described below

commit 91327e7c57541c767cf0dcca2869570cb2e5b61c
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Thu Apr 27 14:00:43 2023 -0700

    Spark 3.4: Remove no longer needed write extensions (#7443)
---
 .../catalyst/analysis/RewriteMergeIntoTable.scala  |   4 +-
 .../analysis/RewriteRowLevelIcebergCommand.scala   |   4 +-
 .../expressions/ExtendedV2ExpressionUtils.scala    | 103 -----------
 .../catalyst/plans/logical/WriteIcebergDelta.scala |   6 +-
 .../v2/ExtendedDistributionAndOrderingUtils.scala  |  85 ---------
 .../datasources/v2/ExtendedV2Writes.scala          |  66 +------
 .../RowLevelCommandDynamicPruning.scala            |   6 +-
 .../spark/extensions/TestIcebergExpressions.java   |  74 --------
 .../extensions/TestRewriteDataFilesProcedure.java  |   8 +
 .../java/org/apache/iceberg/spark/BaseCatalog.java |  37 +---
 .../org/apache/iceberg/spark/SortOrderToSpark.java |   2 +-
 .../iceberg/spark/SparkCachedTableCatalog.java     |   2 +-
 .../iceberg/spark/SparkFunctionCatalog.java}       |  37 ++--
 .../apache/iceberg/spark/SupportsFunctions.java    |  63 +++++++
 .../spark/actions/SparkShufflingDataRewriter.java  |  83 ++++++---
 .../spark/actions/SparkSortDataRewriter.java       |  12 +-
 .../spark/actions/SparkZOrderDataRewriter.java     |  11 +-
 .../iceberg/spark/source/SparkWriteBuilder.java    |  12 +-
 .../expressions/TransformExpressions.scala         | 160 -----------------
 .../utils/DistributionAndOrderingUtils.scala       | 189 ---------------------
 .../spark/actions/TestRemoveOrphanFilesAction.java |   8 +-
 .../spark/actions/TestRewriteDataFilesAction.java  |  10 ++
 .../TestRequiredDistributionAndOrdering.java       |  17 +-
 23 files changed, 209 insertions(+), 790 deletions(-)

diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
index c01306ccf5..ee94c2b2fd 100644
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala
@@ -27,13 +27,13 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.expressions.AttributeSet
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.ExtendedV2ExpressionUtils
 import org.apache.spark.sql.catalyst.expressions.IsNotNull
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.expressions.Literal.FalseLiteral
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.expressions.MonotonicallyIncreasingID
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
+import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils
 import org.apache.spark.sql.catalyst.plans.FullOuter
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.LeftAnti
@@ -390,7 +390,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelIcebergCommand with Predicat
   }
 
   private def resolveAttrRef(ref: NamedReference, plan: LogicalPlan): AttributeReference = {
-    ExtendedV2ExpressionUtils.resolveRef[AttributeReference](ref, plan)
+    V2ExpressionUtils.resolveRef[AttributeReference](ref, plan)
   }
 
   private def buildMergeDeltaProjections(
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelIcebergCommand.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelIcebergCommand.scala
index b460f648d2..abadab4e53 100644
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelIcebergCommand.scala
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteRowLevelIcebergCommand.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.ProjectingInternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.expressions.ExtendedV2ExpressionUtils
+import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
 import org.apache.spark.sql.connector.write.RowLevelOperation
@@ -73,7 +73,7 @@ trait RewriteRowLevelIcebergCommand extends RewriteRowLevelCommand {
 
     operation match {
       case supportsDelta: SupportsDelta =>
-        val rowIdAttrs = ExtendedV2ExpressionUtils.resolveRefs[AttributeReference](
+        val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
           supportsDelta.rowId.toSeq,
           relation)
 
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExtendedV2ExpressionUtils.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExtendedV2ExpressionUtils.scala
deleted file mode 100644
index 16ff67a705..0000000000
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExtendedV2ExpressionUtils.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions
-
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.connector.expressions.{Expression => V2Expression}
-import org.apache.spark.sql.connector.expressions.{SortDirection => V2SortDirection}
-import org.apache.spark.sql.connector.expressions.{NullOrdering => V2NullOrdering}
-import org.apache.spark.sql.connector.expressions.BucketTransform
-import org.apache.spark.sql.connector.expressions.DaysTransform
-import org.apache.spark.sql.connector.expressions.FieldReference
-import org.apache.spark.sql.connector.expressions.HoursTransform
-import org.apache.spark.sql.connector.expressions.IdentityTransform
-import org.apache.spark.sql.connector.expressions.MonthsTransform
-import org.apache.spark.sql.connector.expressions.NamedReference
-import org.apache.spark.sql.connector.expressions.SortValue
-import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.connector.expressions.TruncateTransform
-import org.apache.spark.sql.connector.expressions.YearsTransform
-import org.apache.spark.sql.errors.QueryCompilationErrors
-
-/**
- * A class that is inspired by V2ExpressionUtils in Spark but supports Iceberg transforms.
- */
-object ExtendedV2ExpressionUtils extends SQLConfHelper {
-  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
-
-  def resolveRef[T <: NamedExpression](ref: NamedReference, plan: LogicalPlan): T = {
-    plan.resolve(ref.fieldNames.toSeq, conf.resolver) match {
-      case Some(namedExpr) =>
-        namedExpr.asInstanceOf[T]
-      case None =>
-        val name = ref.fieldNames.toSeq.quoted
-        val outputString = plan.output.map(_.name).mkString(",")
-        throw QueryCompilationErrors.cannotResolveAttributeError(name, outputString)
-    }
-  }
-
-  def resolveRefs[T <: NamedExpression](refs: Seq[NamedReference], plan: LogicalPlan): Seq[T] = {
-    refs.map(ref => resolveRef[T](ref, plan))
-  }
-
-  def toCatalyst(expr: V2Expression, query: LogicalPlan): Expression = {
-    expr match {
-      case SortValue(child, direction, nullOrdering) =>
-        val catalystChild = toCatalyst(child, query)
-        SortOrder(catalystChild, toCatalyst(direction), toCatalyst(nullOrdering), Seq.empty)
-      case IdentityTransform(ref) =>
-        resolveRef[NamedExpression](ref, query)
-      case t: Transform if BucketTransform.unapply(t).isDefined =>
-        t match {
-            // sort columns will be empty for bucket.
-          case BucketTransform(numBuckets, cols, _) =>
-            IcebergBucketTransform(numBuckets, resolveRef[NamedExpression](cols.head, query))
-          case _ => t.asInstanceOf[Expression]
-            // do nothing
-        }
-      case TruncateTransform(length, ref) =>
-        IcebergTruncateTransform(resolveRef[NamedExpression](ref, query), length)
-      case YearsTransform(ref) =>
-        IcebergYearTransform(resolveRef[NamedExpression](ref, query))
-      case MonthsTransform(ref) =>
-        IcebergMonthTransform(resolveRef[NamedExpression](ref, query))
-      case DaysTransform(ref) =>
-        IcebergDayTransform(resolveRef[NamedExpression](ref, query))
-      case HoursTransform(ref) =>
-        IcebergHourTransform(resolveRef[NamedExpression](ref, query))
-      case ref: FieldReference =>
-        resolveRef[NamedExpression](ref, query)
-      case _ =>
-        throw new AnalysisException(s"$expr is not currently supported")
-    }
-  }
-
-  private def toCatalyst(direction: V2SortDirection): SortDirection = direction match {
-    case V2SortDirection.ASCENDING => Ascending
-    case V2SortDirection.DESCENDING => Descending
-  }
-
-  private def toCatalyst(nullOrdering: V2NullOrdering): NullOrdering = nullOrdering match {
-    case V2NullOrdering.NULLS_FIRST => NullsFirst
-    case V2NullOrdering.NULLS_LAST => NullsLast
-  }
-}
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/WriteIcebergDelta.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/WriteIcebergDelta.scala
index 10db698b9b..8495856fb6 100644
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/WriteIcebergDelta.scala
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/WriteIcebergDelta.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.analysis.NamedRelation
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.expressions.ExtendedV2ExpressionUtils
 import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.catalyst.util.RowDeltaUtils.OPERATION_COLUMN
 import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
@@ -80,7 +80,7 @@ case class WriteIcebergDelta(
   }
 
   private def rowIdAttrsResolved: Boolean = {
-    val rowIdAttrs = ExtendedV2ExpressionUtils.resolveRefs[AttributeReference](
+    val rowIdAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
       operation.rowId.toSeq,
       originalTable)
 
@@ -92,7 +92,7 @@ case class WriteIcebergDelta(
   private def metadataAttrsResolved: Boolean = {
     projections.metadataProjection match {
       case Some(projection) =>
-        val metadataAttrs = ExtendedV2ExpressionUtils.resolveRefs[AttributeReference](
+        val metadataAttrs = V2ExpressionUtils.resolveRefs[AttributeReference](
           operation.requiredMetadataAttributes.toSeq,
           originalTable)
 
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDistributionAndOrderingUtils.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDistributionAndOrderingUtils.scala
deleted file mode 100644
index 8c37b1b759..0000000000
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedDistributionAndOrderingUtils.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.execution.datasources.v2
-
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.ExtendedV2ExpressionUtils.toCatalyst
-import org.apache.spark.sql.catalyst.expressions.SortOrder
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
-import org.apache.spark.sql.catalyst.plans.logical.Sort
-import org.apache.spark.sql.connector.distributions.ClusteredDistribution
-import org.apache.spark.sql.connector.distributions.OrderedDistribution
-import org.apache.spark.sql.connector.distributions.UnspecifiedDistribution
-import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering
-import org.apache.spark.sql.connector.write.Write
-import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.internal.SQLConf
-import scala.collection.compat.immutable.ArraySeq
-
-/**
- * A rule that is inspired by DistributionAndOrderingUtils in Spark but supports Iceberg transforms.
- *
- * Note that similarly to the original rule in Spark, it does not let AQE pick the number of shuffle
- * partitions. See SPARK-34230 for context.
- */
-object ExtendedDistributionAndOrderingUtils {
-
-  def prepareQuery(write: Write, query: LogicalPlan, conf: SQLConf): LogicalPlan = write match {
-    case write: RequiresDistributionAndOrdering =>
-      val numPartitions = write.requiredNumPartitions()
-      val distribution = write.requiredDistribution match {
-        case d: OrderedDistribution => d.ordering.map(e => toCatalyst(e, query))
-        case d: ClusteredDistribution => d.clustering.map(e => toCatalyst(e, query))
-        case _: UnspecifiedDistribution => Array.empty[Expression]
-      }
-
-      val queryWithDistribution = if (distribution.nonEmpty) {
-        val finalNumPartitions = if (numPartitions > 0) {
-          numPartitions
-        } else {
-          conf.numShufflePartitions
-        }
-        // the conversion to catalyst expressions above produces SortOrder expressions
-        // for OrderedDistribution and generic expressions for ClusteredDistribution
-        // this allows RepartitionByExpression to pick either range or hash partitioning
-        RepartitionByExpression(ArraySeq.unsafeWrapArray(distribution), query, finalNumPartitions)
-      } else if (numPartitions > 0) {
-        throw QueryCompilationErrors.numberOfPartitionsNotAllowedWithUnspecifiedDistributionError()
-      } else {
-        query
-      }
-
-      val ordering = write.requiredOrdering.toSeq
-        .map(e => toCatalyst(e, query))
-        .asInstanceOf[Seq[SortOrder]]
-
-      val queryWithDistributionAndOrdering = if (ordering.nonEmpty) {
-        Sort(ordering, global = false, queryWithDistribution)
-      } else {
-        queryWithDistribution
-      }
-
-      queryWithDistributionAndOrdering
-
-    case _ =>
-      query
-  }
-}
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala
index 83b793925d..0d13f6a523 100644
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ExtendedV2Writes.scala
@@ -22,100 +22,40 @@ package org.apache.spark.sql.execution.datasources.v2
 import java.util.Optional
 import java.util.UUID
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
-import org.apache.spark.sql.catalyst.plans.logical.AppendData
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.logical.OverwriteByExpression
-import org.apache.spark.sql.catalyst.plans.logical.OverwritePartitionsDynamic
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.catalyst.plans.logical.ReplaceIcebergData
 import org.apache.spark.sql.catalyst.plans.logical.WriteIcebergDelta
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.WriteDeltaProjections
-import org.apache.spark.sql.catalyst.utils.PlanUtils.isIcebergRelation
 import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.write.DeltaWriteBuilder
 import org.apache.spark.sql.connector.write.LogicalWriteInfoImpl
-import org.apache.spark.sql.connector.write.SupportsDynamicOverwrite
-import org.apache.spark.sql.connector.write.SupportsOverwrite
-import org.apache.spark.sql.connector.write.SupportsTruncate
 import org.apache.spark.sql.connector.write.WriteBuilder
-import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.datasources.DataSourceStrategy
-import org.apache.spark.sql.sources.AlwaysTrue
-import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
 /**
- * A rule that is inspired by V2Writes in Spark but supports Iceberg transforms.
+ * A rule that is inspired by V2Writes in Spark but supports Iceberg specific plans.
  */
 object ExtendedV2Writes extends Rule[LogicalPlan] with PredicateHelper {
 
   import DataSourceV2Implicits._
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
-    case a @ AppendData(r: DataSourceV2Relation, query, options, _, None, _) if isIcebergRelation(r) =>
-      val writeBuilder = newWriteBuilder(r.table, query.schema, options)
-      val write = writeBuilder.build()
-      val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
-      a.copy(write = Some(write), query = newQuery)
-
-    case o @ OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, options, _, None, _)
-        if isIcebergRelation(r) =>
-      // fail if any filter cannot be converted. correctness depends on removing all matching data.
-      val filters = splitConjunctivePredicates(deleteExpr).flatMap { pred =>
-        val filter = DataSourceStrategy.translateFilter(pred, supportNestedPredicatePushdown = true)
-        if (filter.isEmpty) {
-          throw QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(pred)
-        }
-        filter
-      }.toArray
-
-      val table = r.table
-      val writeBuilder = newWriteBuilder(table, query.schema, options)
-      val write = writeBuilder match {
-        case builder: SupportsTruncate if isTruncate(filters) =>
-          builder.truncate().build()
-        case builder: SupportsOverwrite =>
-          builder.overwrite(filters).build()
-        case _ =>
-          throw QueryExecutionErrors.overwriteTableByUnsupportedExpressionError(table)
-      }
-
-      val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
-      o.copy(write = Some(write), query = newQuery)
-
-    case o @ OverwritePartitionsDynamic(r: DataSourceV2Relation, query, options, _, None)
-        if isIcebergRelation(r) =>
-      val table = r.table
-      val writeBuilder = newWriteBuilder(table, query.schema, options)
-      val write = writeBuilder match {
-        case builder: SupportsDynamicOverwrite =>
-          builder.overwriteDynamicPartitions().build()
-        case _ =>
-          throw QueryExecutionErrors.dynamicPartitionOverwriteUnsupportedByTableError(table)
-      }
-      val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
-      o.copy(write = Some(write), query = newQuery)
-
     case rd @ ReplaceIcebergData(r: DataSourceV2Relation, query, _, None) =>
       val rowSchema = StructType.fromAttributes(rd.dataInput)
       val writeBuilder = newWriteBuilder(r.table, rowSchema, Map.empty)
       val write = writeBuilder.build()
-      val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(write, query, conf)
+      val newQuery = DistributionAndOrderingUtils.prepareQuery(write, query, r.funCatalog)
       rd.copy(write = Some(write), query = Project(rd.dataInput, newQuery))
 
     case wd @ WriteIcebergDelta(r: DataSourceV2Relation, query, _, projections, None) =>
       val deltaWriteBuilder = newDeltaWriteBuilder(r.table, Map.empty, projections)
       val deltaWrite = deltaWriteBuilder.build()
-      val newQuery = ExtendedDistributionAndOrderingUtils.prepareQuery(deltaWrite, query, conf)
+      val newQuery = DistributionAndOrderingUtils.prepareQuery(deltaWrite, query, r.funCatalog)
       wd.copy(write = Some(deltaWrite), query = newQuery)
   }
 
-  private def isTruncate(filters: Array[Filter]): Boolean = {
-    filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
-  }
-
   private def newWriteBuilder(
       table: Table,
       rowSchema: StructType,
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
index f5d5affe9e..de26ea4486 100644
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
+++ b/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelCommandDynamicPruning.scala
@@ -26,10 +26,10 @@ import org.apache.spark.sql.catalyst.expressions.AttributeMap
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
 import org.apache.spark.sql.catalyst.expressions.DynamicPruningSubquery
 import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.ExtendedV2ExpressionUtils
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+import org.apache.spark.sql.catalyst.expressions.V2ExpressionUtils
 import org.apache.spark.sql.catalyst.planning.RewrittenRowLevelCommand
 import org.apache.spark.sql.catalyst.plans.LeftSemi
 import org.apache.spark.sql.catalyst.plans.logical.DeleteFromIcebergTable
@@ -80,8 +80,8 @@ case class RowLevelCommandDynamicPruning(spark: SparkSession) extends Rule[Logic
           val matchingRowsPlan = buildMatchingRowsPlan(relation, command)
 
           val filterAttrs = ArraySeq.unsafeWrapArray(scan.filterAttributes)
-          val buildKeys = ExtendedV2ExpressionUtils.resolveRefs[Attribute](filterAttrs, matchingRowsPlan)
-          val pruningKeys = ExtendedV2ExpressionUtils.resolveRefs[Attribute](filterAttrs, r)
+          val buildKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, matchingRowsPlan)
+          val pruningKeys = V2ExpressionUtils.resolveRefs[Attribute](filterAttrs, r)
           val dynamicPruningCond = buildDynamicPruningCond(matchingRowsPlan, buildKeys, pruningKeys)
 
           Filter(dynamicPruningCond, r)
diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestIcebergExpressions.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestIcebergExpressions.java
deleted file mode 100644
index 8d2e10ea17..0000000000
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestIcebergExpressions.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.spark.extensions;
-
-import java.math.BigDecimal;
-import java.util.Map;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.spark.sql.Column;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.expressions.IcebergTruncateTransform;
-import org.junit.After;
-import org.junit.Test;
-
-public class TestIcebergExpressions extends SparkExtensionsTestBase {
-
-  public TestIcebergExpressions(
-      String catalogName, String implementation, Map<String, String> config) {
-    super(catalogName, implementation, config);
-  }
-
-  @After
-  public void removeTables() {
-    sql("DROP TABLE IF EXISTS %s", tableName);
-    sql("DROP VIEW IF EXISTS emp");
-    sql("DROP VIEW IF EXISTS v");
-  }
-
-  @Test
-  public void testTruncateExpressions() {
-    sql(
-        "CREATE TABLE %s ( "
-            + "  int_c INT, long_c LONG, dec_c DECIMAL(4, 2), str_c STRING, binary_c BINARY "
-            + ") USING iceberg",
-        tableName);
-
-    sql(
-        "CREATE TEMPORARY VIEW emp "
-            + "AS SELECT * FROM VALUES (101, 10001, 10.65, '101-Employee', CAST('1234' AS BINARY)) "
-            + "AS EMP(int_c, long_c, dec_c, str_c, binary_c)");
-
-    sql("INSERT INTO %s SELECT * FROM emp", tableName);
-
-    Dataset<Row> df = spark.sql("SELECT * FROM " + tableName);
-    df.select(
-            new Column(new IcebergTruncateTransform(df.col("int_c").expr(), 2)).as("int_c"),
-            new Column(new IcebergTruncateTransform(df.col("long_c").expr(), 2)).as("long_c"),
-            new Column(new IcebergTruncateTransform(df.col("dec_c").expr(), 50)).as("dec_c"),
-            new Column(new IcebergTruncateTransform(df.col("str_c").expr(), 2)).as("str_c"),
-            new Column(new IcebergTruncateTransform(df.col("binary_c").expr(), 2)).as("binary_c"))
-        .createOrReplaceTempView("v");
-
-    assertEquals(
-        "Should have expected rows",
-        ImmutableList.of(row(100, 10000L, new BigDecimal("10.50"), "10", "12")),
-        sql("SELECT int_c, long_c, dec_c, str_c, CAST(binary_c AS STRING) FROM v"));
-  }
-}
diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
index 44aca898b6..6cda93f867 100644
--- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
+++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java
@@ -41,9 +41,11 @@ import org.apache.spark.sql.AnalysisException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
+import org.apache.spark.sql.internal.SQLConf;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
@@ -55,6 +57,12 @@ public class TestRewriteDataFilesProcedure extends SparkExtensionsTestBase {
     super(catalogName, implementation, config);
   }
 
+  @BeforeClass
+  public static void setupSpark() {
+    // disable AQE as tests assume that writes generate a particular number of files
+    spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false");
+  }
+
   @After
   public void removeTable() {
     sql("DROP TABLE IF EXISTS %s", tableName);
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
index 2e5e383baf..38f15a4295 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
@@ -18,18 +18,13 @@
  */
 package org.apache.iceberg.spark;
 
-import org.apache.iceberg.spark.functions.SparkFunctions;
 import org.apache.iceberg.spark.procedures.SparkProcedures;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.iceberg.spark.source.HasIcebergCatalog;
-import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
-import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
-import org.apache.spark.sql.connector.catalog.FunctionCatalog;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
-import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
 import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
 import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
 
@@ -38,7 +33,7 @@ abstract class BaseCatalog
         ProcedureCatalog,
         SupportsNamespaces,
         HasIcebergCatalog,
-        FunctionCatalog {
+        SupportsFunctions {
 
   @Override
   public Procedure loadProcedure(Identifier ident) throws NoSuchProcedureException {
@@ -58,35 +53,17 @@ abstract class BaseCatalog
   }
 
   @Override
-  public Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException {
-    if (namespace.length == 0 || isSystemNamespace(namespace)) {
-      return SparkFunctions.list().stream()
-          .map(name -> Identifier.of(namespace, name))
-          .toArray(Identifier[]::new);
-    } else if (namespaceExists(namespace)) {
-      return new Identifier[0];
-    }
-
-    throw new NoSuchNamespaceException(namespace);
-  }
-
-  @Override
-  public UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
-    String[] namespace = ident.namespace();
-    String name = ident.name();
-
+  public boolean isFunctionNamespace(String[] namespace) {
     // Allow for empty namespace, as Spark's storage partitioned joins look up
     // the corresponding functions to generate transforms for partitioning
     // with an empty namespace, such as `bucket`.
     // Otherwise, use `system` namespace.
-    if (namespace.length == 0 || isSystemNamespace(namespace)) {
-      UnboundFunction func = SparkFunctions.load(name);
-      if (func != null) {
-        return func;
-      }
-    }
+    return namespace.length == 0 || isSystemNamespace(namespace);
+  }
 
-    throw new NoSuchFunctionException(ident);
+  @Override
+  public boolean isExistingNamespace(String[] namespace) {
+    return namespaceExists(namespace);
   }
 
   private static boolean isSystemNamespace(String[] namespace) {
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SortOrderToSpark.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SortOrderToSpark.java
index 52d68db2e4..781f61b33f 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SortOrderToSpark.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SortOrderToSpark.java
@@ -53,7 +53,7 @@ class SortOrderToSpark implements SortOrderVisitor<SortOrder> {
       String sourceName, int id, int width, SortDirection direction, NullOrder nullOrder) {
     return Expressions.sort(
         Expressions.apply(
-            "truncate", Expressions.column(quotedName(id)), Expressions.literal(width)),
+            "truncate", Expressions.literal(width), Expressions.column(quotedName(id))),
         toSpark(direction),
         toSpark(nullOrder));
   }
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
index 2533b3bd75..21317526d2 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkCachedTableCatalog.java
@@ -43,7 +43,7 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 /** An internal table catalog that is capable of loading tables from a cache. */
-public class SparkCachedTableCatalog implements TableCatalog {
+public class SparkCachedTableCatalog implements TableCatalog, SupportsFunctions {
 
   private static final String CLASS_NAME = SparkCachedTableCatalog.class.getName();
   private static final Splitter COMMA = Splitter.on(",");
diff --git a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/connector/expressions/TruncateTransform.scala b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkFunctionCatalog.java
similarity index 55%
rename from spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/connector/expressions/TruncateTransform.scala
rename to spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkFunctionCatalog.java
index 2a3269e2db..2183b9e5df 100644
--- a/spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/connector/expressions/TruncateTransform.scala
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkFunctionCatalog.java
@@ -16,23 +16,30 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iceberg.spark;
 
-package org.apache.spark.sql.connector.expressions
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
-import org.apache.spark.sql.types.IntegerType
+/**
+ * A function catalog that can be used to resolve Iceberg functions without a metastore connection.
+ */
+public class SparkFunctionCatalog implements SupportsFunctions {
+
+  private static final SparkFunctionCatalog INSTANCE = new SparkFunctionCatalog();
+
+  private String name = "iceberg-function-catalog";
+
+  public static SparkFunctionCatalog get() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void initialize(String catalogName, CaseInsensitiveStringMap options) {
+    this.name = catalogName;
+  }
 
-private[sql] object TruncateTransform {
-  def unapply(expr: Expression): Option[(Int, FieldReference)] = expr match {
-    case transform: Transform =>
-      transform match {
-        case NamedTransform("truncate", Seq(Ref(seq: Seq[String]), Lit(value: Int, IntegerType))) =>
-          Some((value, FieldReference(seq)))
-        case NamedTransform("truncate", Seq(Lit(value: Int, IntegerType), Ref(seq: Seq[String]))) =>
-          Some((value, FieldReference(seq)))
-        case _ =>
-          None
-      }
-    case _ =>
-      None
+  @Override
+  public String name() {
+    return name;
   }
 }
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SupportsFunctions.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SupportsFunctions.java
new file mode 100644
index 0000000000..34897d2b4c
--- /dev/null
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SupportsFunctions.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import org.apache.iceberg.spark.functions.SparkFunctions;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.connector.catalog.FunctionCatalog;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+
+interface SupportsFunctions extends FunctionCatalog {
+
+  default boolean isFunctionNamespace(String[] namespace) {
+    return namespace.length == 0;
+  }
+
+  default boolean isExistingNamespace(String[] namespace) {
+    return namespace.length == 0;
+  }
+
+  default Identifier[] listFunctions(String[] namespace) throws NoSuchNamespaceException {
+    if (isFunctionNamespace(namespace)) {
+      return SparkFunctions.list().stream()
+          .map(name -> Identifier.of(namespace, name))
+          .toArray(Identifier[]::new);
+    } else if (isExistingNamespace(namespace)) {
+      return new Identifier[0];
+    }
+
+    throw new NoSuchNamespaceException(namespace);
+  }
+
+  default UnboundFunction loadFunction(Identifier ident) throws NoSuchFunctionException {
+    String[] namespace = ident.namespace();
+    String name = ident.name();
+
+    if (isFunctionNamespace(namespace)) {
+      UnboundFunction func = SparkFunctions.load(name);
+      if (func != null) {
+        return func;
+      }
+    }
+
+    throw new NoSuchFunctionException(ident);
+  }
+}
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
index 1add6383c6..53d5f49b9f 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java
@@ -21,11 +21,13 @@ package org.apache.iceberg.spark.actions;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
+import org.apache.iceberg.spark.SparkFunctionCatalog;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.util.PropertyUtil;
@@ -34,11 +36,13 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
-import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$;
+import org.apache.spark.sql.connector.distributions.Distribution;
 import org.apache.spark.sql.connector.distributions.Distributions;
 import org.apache.spark.sql.connector.distributions.OrderedDistribution;
 import org.apache.spark.sql.connector.expressions.SortOrder;
-import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering;
+import org.apache.spark.sql.execution.datasources.v2.DistributionAndOrderingUtils$;
+import scala.Option;
 
 abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
@@ -61,7 +65,10 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
     super(spark, table);
   }
 
-  protected abstract Dataset<Row> sortedDF(Dataset<Row> df, List<FileScanTask> group);
+  protected abstract org.apache.iceberg.SortOrder sortOrder();
+
+  protected abstract Dataset<Row> sortedDF(
+      Dataset<Row> df, Function<Dataset<Row>, Dataset<Row>> sortFunc);
 
   @Override
   public Set<String> validOptions() {
@@ -79,9 +86,6 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
 
   @Override
   public void doRewrite(String groupId, List<FileScanTask> group) {
-    // the number of shuffle partition controls the number of output files
-    spark().conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), numShufflePartitions(group));
-
     Dataset<Row> scanDF =
         spark()
             .read()
@@ -89,7 +93,7 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
             .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId)
             .load(groupId);
 
-    Dataset<Row> sortedDF = sortedDF(scanDF, group);
+    Dataset<Row> sortedDF = sortedDF(scanDF, sortFunction(group));
 
     sortedDF
         .write()
@@ -101,30 +105,35 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
         .save(groupId);
   }
 
-  protected Dataset<Row> sort(Dataset<Row> df, org.apache.iceberg.SortOrder sortOrder) {
-    SortOrder[] ordering = SparkDistributionAndOrderingUtil.convert(sortOrder);
-    OrderedDistribution distribution = Distributions.ordered(ordering);
-    SQLConf conf = spark().sessionState().conf();
-    LogicalPlan plan = df.logicalPlan();
-    LogicalPlan sortPlan =
-        DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, ordering, plan, conf);
-    return new Dataset<>(spark(), sortPlan, df.encoder());
+  private Function<Dataset<Row>, Dataset<Row>> sortFunction(List<FileScanTask> group) {
+    SortOrder[] ordering = SparkDistributionAndOrderingUtil.convert(outputSortOrder(group));
+    int numShufflePartitions = numShufflePartitions(group);
+    return (df) -> transformPlan(df, plan -> sortPlan(plan, ordering, numShufflePartitions));
   }
 
-  protected org.apache.iceberg.SortOrder outputSortOrder(
-      List<FileScanTask> group, org.apache.iceberg.SortOrder sortOrder) {
+  private LogicalPlan sortPlan(LogicalPlan plan, SortOrder[] ordering, int numShufflePartitions) {
+    SparkFunctionCatalog catalog = SparkFunctionCatalog.get();
+    OrderedWrite write = new OrderedWrite(ordering, numShufflePartitions);
+    return DistributionAndOrderingUtils$.MODULE$.prepareQuery(write, plan, Option.apply(catalog));
+  }
+
+  private Dataset<Row> transformPlan(Dataset<Row> df, Function<LogicalPlan, LogicalPlan> func) {
+    return new Dataset<>(spark(), func.apply(df.logicalPlan()), df.encoder());
+  }
+
+  private org.apache.iceberg.SortOrder outputSortOrder(List<FileScanTask> group) {
     boolean includePartitionColumns = !group.get(0).spec().equals(table().spec());
     if (includePartitionColumns) {
       // build in the requirement for partition sorting into our sort order
       // as the original spec for this group does not match the output spec
-      return SortOrderUtil.buildSortOrder(table(), sortOrder);
+      return SortOrderUtil.buildSortOrder(table(), sortOrder());
     } else {
-      return sortOrder;
+      return sortOrder();
     }
   }
 
-  private long numShufflePartitions(List<FileScanTask> group) {
-    long numOutputFiles = numOutputFiles((long) (inputSize(group) * compressionFactor));
+  private int numShufflePartitions(List<FileScanTask> group) {
+    int numOutputFiles = (int) numOutputFiles((long) (inputSize(group) * compressionFactor));
     return Math.max(1, numOutputFiles);
   }
 
@@ -135,4 +144,36 @@ abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter {
         value > 0, "'%s' is set to %s but must be > 0", COMPRESSION_FACTOR, value);
     return value;
   }
+
+  private static class OrderedWrite implements RequiresDistributionAndOrdering {
+    private final OrderedDistribution distribution;
+    private final SortOrder[] ordering;
+    private final int numShufflePartitions;
+
+    OrderedWrite(SortOrder[] ordering, int numShufflePartitions) {
+      this.distribution = Distributions.ordered(ordering);
+      this.ordering = ordering;
+      this.numShufflePartitions = numShufflePartitions;
+    }
+
+    @Override
+    public Distribution requiredDistribution() {
+      return distribution;
+    }
+
+    @Override
+    public boolean distributionStrictlyRequired() {
+      return true;
+    }
+
+    @Override
+    public int requiredNumPartitions() {
+      return numShufflePartitions;
+    }
+
+    @Override
+    public SortOrder[] requiredOrdering() {
+      return ordering;
+    }
+  }
 }
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java
index 4615f3cebc..1f70d4d7ca 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java
@@ -18,8 +18,7 @@
  */
 package org.apache.iceberg.spark.actions;
 
-import java.util.List;
-import org.apache.iceberg.FileScanTask;
+import java.util.function.Function;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -54,7 +53,12 @@ class SparkSortDataRewriter extends SparkShufflingDataRewriter {
   }
 
   @Override
-  protected Dataset<Row> sortedDF(Dataset<Row> df, List<FileScanTask> group) {
-    return sort(df, outputSortOrder(group, sortOrder));
+  protected SortOrder sortOrder() {
+    return sortOrder;
+  }
+
+  @Override
+  protected Dataset<Row> sortedDF(Dataset<Row> df, Function<Dataset<Row>, Dataset<Row>> sortFunc) {
+    return sortFunc.apply(df);
   }
 }
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
index 68db76d37f..91eaa91f68 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java
@@ -23,7 +23,7 @@ import static org.apache.spark.sql.functions.array;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import org.apache.iceberg.FileScanTask;
+import java.util.function.Function;
 import org.apache.iceberg.NullOrder;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortDirection;
@@ -104,9 +104,14 @@ class SparkZOrderDataRewriter extends SparkShufflingDataRewriter {
   }
 
   @Override
-  protected Dataset<Row> sortedDF(Dataset<Row> df, List<FileScanTask> group) {
+  protected SortOrder sortOrder() {
+    return Z_SORT_ORDER;
+  }
+
+  @Override
+  protected Dataset<Row> sortedDF(Dataset<Row> df, Function<Dataset<Row>, Dataset<Row>> sortFunc) {
     Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zValue(df));
-    Dataset<Row> sortedDF = sort(zValueDF, outputSortOrder(group, Z_SORT_ORDER));
+    Dataset<Row> sortedDF = sortFunc.apply(zValueDF);
     return sortedDF.drop(Z_COLUMN);
   }
 
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
index 133ca45b46..30f04659df 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWriteBuilder.java
@@ -27,7 +27,6 @@ import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkSchemaUtil;
@@ -146,15 +145,8 @@ class SparkWriteBuilder implements WriteBuilder, SupportsDynamicOverwrite, Suppo
     SortOrder[] ordering;
 
     if (useTableDistributionAndOrdering) {
-      if (Spark3Util.extensionsEnabled(spark) || allIdentityTransforms(table.spec())) {
-        distribution = buildRequiredDistribution();
-        ordering = buildRequiredOrdering(distribution);
-      } else {
-        LOG.warn(
-            "Skipping distribution/ordering: extensions are disabled and spec contains unsupported transforms");
-        distribution = Distributions.unspecified();
-        ordering = NO_ORDERING;
-      }
+      distribution = buildRequiredDistribution();
+      ordering = buildRequiredOrdering(distribution);
     } else {
       LOG.info("Skipping distribution/ordering: disabled per job configuration");
       distribution = Distributions.unspecified();
diff --git a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpressions.scala b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpressions.scala
deleted file mode 100644
index dffac82af7..0000000000
--- a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/expressions/TransformExpressions.scala
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.catalyst.expressions
-
-import java.nio.ByteBuffer
-import java.nio.CharBuffer
-import java.nio.charset.StandardCharsets
-import java.util.function
-import org.apache.iceberg.spark.SparkSchemaUtil
-import org.apache.iceberg.transforms.Transform
-import org.apache.iceberg.transforms.Transforms
-import org.apache.iceberg.types.Type
-import org.apache.iceberg.types.Types
-import org.apache.iceberg.util.ByteBuffers
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
-import org.apache.spark.sql.types.AbstractDataType
-import org.apache.spark.sql.types.BinaryType
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.types.Decimal
-import org.apache.spark.sql.types.DecimalType
-import org.apache.spark.sql.types.IntegerType
-import org.apache.spark.sql.types.StringType
-import org.apache.spark.sql.types.TimestampType
-import org.apache.spark.unsafe.types.UTF8String
-
-abstract class IcebergTransformExpression
-  extends UnaryExpression with CodegenFallback with NullIntolerant {
-
-  @transient lazy val icebergInputType: Type = SparkSchemaUtil.convert(child.dataType)
-}
-
-abstract class IcebergTimeTransform
-  extends IcebergTransformExpression with ImplicitCastInputTypes {
-
-  def transform: function.Function[Any, Integer]
-
-  override protected def nullSafeEval(value: Any): Any = {
-    transform(value).toInt
-  }
-
-  override def dataType: DataType = IntegerType
-
-  override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType)
-}
-
-case class IcebergYearTransform(child: Expression)
-  extends IcebergTimeTransform {
-
-  @transient lazy val transform: function.Function[Any, Integer] = Transforms.year[Any]().bind(icebergInputType)
-
-  override protected def withNewChildInternal(newChild: Expression): Expression = {
-    copy(child = newChild)
-  }
-}
-
-case class IcebergMonthTransform(child: Expression)
-  extends IcebergTimeTransform {
-
-  @transient lazy val transform: function.Function[Any, Integer] = Transforms.month[Any]().bind(icebergInputType)
-
-  override protected def withNewChildInternal(newChild: Expression): Expression = {
-    copy(child = newChild)
-  }
-}
-
-case class IcebergDayTransform(child: Expression)
-  extends IcebergTimeTransform {
-
-  @transient lazy val transform: function.Function[Any, Integer] = Transforms.day[Any]().bind(icebergInputType)
-
-  override protected def withNewChildInternal(newChild: Expression): Expression = {
-    copy(child = newChild)
-  }
-}
-
-case class IcebergHourTransform(child: Expression)
-  extends IcebergTimeTransform {
-
-  @transient lazy val transform: function.Function[Any, Integer] = Transforms.hour[Any]().bind(icebergInputType)
-
-  override protected def withNewChildInternal(newChild: Expression): Expression = {
-    copy(child = newChild)
-  }
-}
-
-case class IcebergBucketTransform(numBuckets: Int, child: Expression) extends IcebergTransformExpression {
-
-  @transient lazy val bucketFunc: Any => Int = child.dataType match {
-    case _: DecimalType =>
-      val t = Transforms.bucket[Any](numBuckets).bind(icebergInputType)
-      d: Any => t(d.asInstanceOf[Decimal].toJavaBigDecimal).toInt
-    case _: StringType =>
-      // the spec requires that the hash of a string is equal to the hash of its UTF-8 encoded bytes
-      // TODO: pass bytes without the copy out of the InternalRow
-      val t = Transforms.bucket[ByteBuffer](numBuckets).bind(Types.BinaryType.get())
-      s: Any => t(ByteBuffer.wrap(s.asInstanceOf[UTF8String].getBytes)).toInt
-    case _ =>
-      val t = Transforms.bucket[Any](numBuckets).bind(icebergInputType)
-      a: Any => t(a).toInt
-  }
-
-  override protected def nullSafeEval(value: Any): Any = {
-    bucketFunc(value)
-  }
-
-  override def dataType: DataType = IntegerType
-
-  override protected def withNewChildInternal(newChild: Expression): Expression = {
-    copy(child = newChild)
-  }
-}
-
-case class IcebergTruncateTransform(child: Expression, width: Int) extends IcebergTransformExpression {
-
-  @transient lazy val truncateFunc: Any => Any = child.dataType match {
-    case _: DecimalType =>
-      val t = Transforms.truncate[java.math.BigDecimal](width).bind(icebergInputType)
-      d: Any => Decimal.apply(t(d.asInstanceOf[Decimal].toJavaBigDecimal))
-    case _: StringType =>
-      val t = Transforms.truncate[CharSequence](width).bind(icebergInputType)
-      s: Any => {
-        val charSequence = t(StandardCharsets.UTF_8.decode(ByteBuffer.wrap(s.asInstanceOf[UTF8String].getBytes)))
-        val bb = StandardCharsets.UTF_8.encode(CharBuffer.wrap(charSequence));
-        UTF8String.fromBytes(ByteBuffers.toByteArray(bb))
-      }
-    case _: BinaryType =>
-      val t = Transforms.truncate[ByteBuffer](width).bind(icebergInputType)
-      s: Any => ByteBuffers.toByteArray(t(ByteBuffer.wrap(s.asInstanceOf[Array[Byte]])))
-    case _ =>
-      val t = Transforms.truncate[Any](width).bind(icebergInputType)
-      a: Any => t(a)
-  }
-
-  override protected def nullSafeEval(value: Any): Any = {
-    truncateFunc(value)
-  }
-
-  override def dataType: DataType = child.dataType
-
-  override protected def withNewChildInternal(newChild: Expression): Expression = {
-    copy(child = newChild)
-  }
-}
diff --git a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala b/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
deleted file mode 100644
index 94b6f651a0..0000000000
--- a/spark/v3.4/spark/src/main/scala/org/apache/spark/sql/catalyst/utils/DistributionAndOrderingUtils.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.catalyst.utils
-
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst
-import org.apache.spark.sql.catalyst.analysis.Resolver
-import org.apache.spark.sql.catalyst.expressions.IcebergBucketTransform
-import org.apache.spark.sql.catalyst.expressions.IcebergDayTransform
-import org.apache.spark.sql.catalyst.expressions.IcebergHourTransform
-import org.apache.spark.sql.catalyst.expressions.IcebergMonthTransform
-import org.apache.spark.sql.catalyst.expressions.IcebergTruncateTransform
-import org.apache.spark.sql.catalyst.expressions.IcebergYearTransform
-import org.apache.spark.sql.catalyst.expressions.NamedExpression
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
-import org.apache.spark.sql.catalyst.plans.logical.Sort
-import org.apache.spark.sql.connector.distributions.ClusteredDistribution
-import org.apache.spark.sql.connector.distributions.Distribution
-import org.apache.spark.sql.connector.distributions.OrderedDistribution
-import org.apache.spark.sql.connector.distributions.UnspecifiedDistribution
-import org.apache.spark.sql.connector.expressions.ApplyTransform
-import org.apache.spark.sql.connector.expressions.BucketTransform
-import org.apache.spark.sql.connector.expressions.DaysTransform
-import org.apache.spark.sql.connector.expressions.Expression
-import org.apache.spark.sql.connector.expressions.FieldReference
-import org.apache.spark.sql.connector.expressions.HoursTransform
-import org.apache.spark.sql.connector.expressions.IdentityTransform
-import org.apache.spark.sql.connector.expressions.Literal
-import org.apache.spark.sql.connector.expressions.MonthsTransform
-import org.apache.spark.sql.connector.expressions.NamedReference
-import org.apache.spark.sql.connector.expressions.NullOrdering
-import org.apache.spark.sql.connector.expressions.SortDirection
-import org.apache.spark.sql.connector.expressions.SortOrder
-import org.apache.spark.sql.connector.expressions.Transform
-import org.apache.spark.sql.connector.expressions.YearsTransform
-import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.types.IntegerType
-import scala.collection.compat.immutable.ArraySeq
-
-object DistributionAndOrderingUtils {
-
-  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
-
-  def prepareQuery(
-      requiredDistribution: Distribution,
-      requiredOrdering: Array[SortOrder],
-      query: LogicalPlan,
-      conf: SQLConf): LogicalPlan = {
-
-    val resolver = conf.resolver
-
-    val distribution = requiredDistribution match {
-      case d: OrderedDistribution =>
-        d.ordering.map(e => toCatalyst(e, query, resolver))
-      case d: ClusteredDistribution =>
-        d.clustering.map(e => toCatalyst(e, query, resolver))
-      case _: UnspecifiedDistribution =>
-        Array.empty[catalyst.expressions.Expression]
-    }
-
-    val queryWithDistribution = if (distribution.nonEmpty) {
-      // the conversion to catalyst expressions above produces SortOrder expressions
-      // for OrderedDistribution and generic expressions for ClusteredDistribution
-      // this allows RepartitionByExpression to pick either range or hash partitioning
-      RepartitionByExpression(distribution.toSeq, query, None)
-    } else {
-      query
-    }
-
-    val ordering = requiredOrdering
-      .map(e => toCatalyst(e, query, resolver).asInstanceOf[catalyst.expressions.SortOrder])
-
-    val queryWithDistributionAndOrdering = if (ordering.nonEmpty) {
-      Sort(ArraySeq.unsafeWrapArray(ordering), global = false, queryWithDistribution)
-    } else {
-      queryWithDistribution
-    }
-
-    queryWithDistributionAndOrdering
-  }
-
-  private def toCatalyst(
-      expr: Expression,
-      query: LogicalPlan,
-      resolver: Resolver): catalyst.expressions.Expression = {
-
-    // we cannot perform the resolution in the analyzer since we need to optimize expressions
-    // in nodes like OverwriteByExpression before constructing a logical write
-    def resolve(parts: Seq[String]): NamedExpression = {
-      query.resolve(parts, resolver) match {
-        case Some(attr) =>
-          attr
-        case None =>
-          val ref = parts.quoted
-          throw new AnalysisException(s"Cannot resolve '$ref' using ${query.output}")
-      }
-    }
-
-    expr match {
-      case s: SortOrder =>
-        val catalystChild = toCatalyst(s.expression(), query, resolver)
-        catalyst.expressions.SortOrder(catalystChild, toCatalyst(s.direction), toCatalyst(s.nullOrdering), Seq.empty)
-      case it: IdentityTransform =>
-        resolve(ArraySeq.unsafeWrapArray(it.ref.fieldNames))
-      case BucketTransform(numBuckets, ref) =>
-        IcebergBucketTransform(numBuckets, resolve(ArraySeq.unsafeWrapArray(ref.fieldNames)))
-      case TruncateTransform(ref, width) =>
-        IcebergTruncateTransform(resolve(ArraySeq.unsafeWrapArray(ref.fieldNames)), width)
-      case yt: YearsTransform =>
-        IcebergYearTransform(resolve(ArraySeq.unsafeWrapArray(yt.ref.fieldNames)))
-      case mt: MonthsTransform =>
-        IcebergMonthTransform(resolve(ArraySeq.unsafeWrapArray(mt.ref.fieldNames)))
-      case dt: DaysTransform =>
-        IcebergDayTransform(resolve(ArraySeq.unsafeWrapArray(dt.ref.fieldNames)))
-      case ht: HoursTransform =>
-        IcebergHourTransform(resolve(ArraySeq.unsafeWrapArray(ht.ref.fieldNames)))
-      case ref: FieldReference =>
-        resolve(ArraySeq.unsafeWrapArray(ref.fieldNames))
-      case _ =>
-        throw new RuntimeException(s"$expr is not currently supported")
-
-    }
-  }
-
-  private def toCatalyst(direction: SortDirection): catalyst.expressions.SortDirection = {
-    direction match {
-      case SortDirection.ASCENDING => catalyst.expressions.Ascending
-      case SortDirection.DESCENDING => catalyst.expressions.Descending
-    }
-  }
-
-  private def toCatalyst(nullOrdering: NullOrdering): catalyst.expressions.NullOrdering = {
-    nullOrdering match {
-      case NullOrdering.NULLS_FIRST => catalyst.expressions.NullsFirst
-      case NullOrdering.NULLS_LAST => catalyst.expressions.NullsLast
-    }
-  }
-
-  private object BucketTransform {
-    def unapply(transform: Transform): Option[(Int, FieldReference)] = transform match {
-      case bt: BucketTransform => bt.columns match {
-        case Seq(nf: NamedReference) =>
-          Some(bt.numBuckets.value(), FieldReference(ArraySeq.unsafeWrapArray(nf.fieldNames())))
-        case _ =>
-          None
-      }
-      case _ => None
-    }
-  }
-
-  private object Lit {
-    def unapply[T](literal: Literal[T]): Some[(T, DataType)] = {
-      Some((literal.value, literal.dataType))
-    }
-  }
-
-  private object TruncateTransform {
-    def unapply(transform: Transform): Option[(FieldReference, Int)] = transform match {
-      case at @ ApplyTransform(name, _) if name.equalsIgnoreCase("truncate")  => at.args match {
-        case Seq(nf: NamedReference, Lit(value: Int, IntegerType)) =>
-          Some(FieldReference(ArraySeq.unsafeWrapArray(nf.fieldNames())), value)
-        case Seq(Lit(value: Int, IntegerType), nf: NamedReference) =>
-          Some(FieldReference(ArraySeq.unsafeWrapArray(nf.fieldNames())), value)
-        case _ =>
-          None
-      }
-      case _ => None
-    }
-  }
-}
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
index 536dd5febb..d91ac3606d 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java
@@ -463,9 +463,7 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
         "Should not delete any files", Iterables.isEmpty(result.orphanFileLocations()));
 
     Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
-    List<ThreeColumnRecord> actualRecords =
-        resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
-    Assert.assertEquals("Rows must match", records, actualRecords);
+    Assert.assertEquals("Rows count must match", records.size(), resultDF.count());
   }
 
   @Test
@@ -492,9 +490,7 @@ public abstract class TestRemoveOrphanFilesAction extends SparkTestBase {
         "Should not delete any files", Iterables.isEmpty(result.orphanFileLocations()));
 
     Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
-    List<ThreeColumnRecord> actualRecords =
-        resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
-    Assert.assertEquals("Rows must match", records, actualRecords);
+    Assert.assertEquals("Row count must match", records.size(), resultDF.count());
   }
 
   @Test
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 761284bb56..3ecd7ce371 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -93,6 +93,7 @@ import org.apache.iceberg.spark.FileRewriteCoordinator;
 import org.apache.iceberg.spark.ScanTaskSetManager;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.spark.actions.RewriteDataFilesSparkAction.RewriteExecutionContext;
 import org.apache.iceberg.spark.source.ThreeColumnRecord;
 import org.apache.iceberg.types.Comparators;
@@ -102,8 +103,10 @@ import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.util.Pair;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.internal.SQLConf;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -127,6 +130,12 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
   private final ScanTaskSetManager manager = ScanTaskSetManager.get();
   private String tableLocation = null;
 
+  @BeforeClass
+  public static void setupSpark() {
+    // disable AQE as tests assume that writes generate a particular number of files
+    spark.conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "false");
+  }
+
   @Before
   public void setupTableLocation() throws Exception {
     File tableDir = temp.newFolder();
@@ -1630,6 +1639,7 @@ public class TestRewriteDataFilesAction extends SparkTestBase {
         .write()
         .format("iceberg")
         .mode("append")
+        .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
         .save(tableLocation);
   }
 
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
index 521d90299d..ac481ca473 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
@@ -215,7 +215,7 @@ public class TestRequiredDistributionAndOrdering extends SparkCatalogTestBase {
   }
 
   @Test
-  public void testNoSortBucketTransformsWithoutExtensions() throws NoSuchTableException {
+  public void testSortBucketTransformsWithoutExtensions() throws NoSuchTableException {
     sql(
         "CREATE TABLE %s (c1 INT, c2 STRING, c3 STRING) "
             + "USING iceberg "
@@ -231,20 +231,7 @@ public class TestRequiredDistributionAndOrdering extends SparkCatalogTestBase {
     Dataset<Row> ds = spark.createDataFrame(data, ThreeColumnRecord.class);
     Dataset<Row> inputDF = ds.coalesce(1).sortWithinPartitions("c1");
 
-    // should fail by default as extensions are disabled
-    AssertHelpers.assertThrowsCause(
-        "Should reject writes without ordering",
-        IllegalStateException.class,
-        "Incoming records violate the writer assumption",
-        () -> {
-          try {
-            inputDF.writeTo(tableName).append();
-          } catch (NoSuchTableException e) {
-            throw new RuntimeException(e);
-          }
-        });
-
-    inputDF.writeTo(tableName).option(SparkWriteOptions.FANOUT_ENABLED, "true").append();
+    inputDF.writeTo(tableName).append();
 
     List<Object[]> expected =
         ImmutableList.of(