You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2022/03/04 11:27:43 UTC

[hive] branch master updated: HIVE-25975: Optimize ClusteredWriter for bucketed Iceberg tables (#3060) (Adam Szita, reviewed by Peter Vary and Marton Bod)

This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bded40  HIVE-25975: Optimize ClusteredWriter for bucketed Iceberg tables (#3060) (Adam Szita, reviewed by Peter Vary and Marton Bod)
8bded40 is described below

commit 8bded407ec3bf782db16f569da7ddc2c5a235628
Author: Adam Szita <40...@users.noreply.github.com>
AuthorDate: Fri Mar 4 12:27:26 2022 +0100

    HIVE-25975: Optimize ClusteredWriter for bucketed Iceberg tables (#3060) (Adam Szita, reviewed by Peter Vary and Marton Bod)
---
 .../iceberg/mr/hive/GenericUDFIcebergBucket.java   | 201 +++++++++++
 .../apache/iceberg/mr/hive/HiveIcebergSerDe.java   |   7 +
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  64 +++-
 .../iceberg/mr/hive/HiveIcebergTestUtils.java      |   4 +-
 .../mr/hive/TestHiveIcebergOutputCommitter.java    |  18 +-
 .../queries/positive/dynamic_partition_writes.q    |  46 ++-
 .../positive/dynamic_partition_writes.q.out        | 366 +++++++++++++++++++--
 iceberg/patched-iceberg-core/pom.xml               |   3 +-
 .../org/apache/iceberg/io/ClusteredWriter.java     | 158 ---------
 .../hadoop/hive/ql/exec/FunctionRegistry.java      |   7 +
 .../hive/ql/metadata/HiveStorageHandler.java       |  18 +
 .../ql/optimizer/SortedDynPartitionOptimizer.java  | 163 ++++++---
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     |  10 +-
 .../hadoop/hive/ql/plan/DynamicPartitionCtx.java   |  21 ++
 14 files changed, 822 insertions(+), 264 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/GenericUDFIcebergBucket.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/GenericUDFIcebergBucket.java
new file mode 100644
index 0000000..cab4bb1
--- /dev/null
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/GenericUDFIcebergBucket.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.hive.ql.exec.Description;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorConverter;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableConstantIntObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/**
+ * GenericUDFIcebergBucket - UDF that wraps around Iceberg's bucket transform function
+ */
+@Description(name = "iceberg_bucket",
+    value = "_FUNC_(value, bucketCount) - " +
+        "Returns the bucket value calculated by Iceberg bucket transform function ",
+    extended = "Example:\n  > SELECT _FUNC_('A bucket full of ice!', 5);\n  4")
+public class GenericUDFIcebergBucket extends GenericUDF {
+  private final IntWritable result = new IntWritable();
+  private int numBuckets = -1;
+  private transient PrimitiveObjectInspector argumentOI;
+  private transient ObjectInspectorConverters.Converter converter;
+
+  @FunctionalInterface
+  private interface UDFEvalFunction<T> {
+    void apply(T argument) throws HiveException;
+  }
+
+  private transient UDFEvalFunction<DeferredObject> evaluator;
+
+  @Override
+  public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
+    if (arguments.length != 2) {
+      throw new UDFArgumentLengthException(
+          "ICEBERG_BUCKET requires 2 arguments (value, bucketCount), but got " + arguments.length);
+    }
+
+    numBuckets = getNumBuckets(arguments[1]);
+
+    if (arguments[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
+      throw new UDFArgumentException(
+          "ICEBERG_BUCKET first argument takes primitive types, got " + argumentOI.getTypeName());
+    }
+    argumentOI = (PrimitiveObjectInspector) arguments[0];
+
+    PrimitiveObjectInspector.PrimitiveCategory inputType = argumentOI.getPrimitiveCategory();
+    ObjectInspector outputOI = null;
+    switch (inputType) {
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+        converter = new PrimitiveObjectInspectorConverter.StringConverter(argumentOI);
+        Transform<String, Integer> stringTransform = Transforms.bucket(Types.StringType.get(), numBuckets);
+        evaluator = arg -> {
+          String val = (String) converter.convert(arg.get());
+          result.set(stringTransform.apply(val));
+        };
+        break;
+
+      case BINARY:
+        converter = new PrimitiveObjectInspectorConverter.BinaryConverter(argumentOI,
+            PrimitiveObjectInspectorFactory.writableBinaryObjectInspector);
+        Transform<ByteBuffer, Integer> byteBufferTransform = Transforms.bucket(Types.BinaryType.get(), numBuckets);
+        evaluator = arg -> {
+          BytesWritable val = (BytesWritable) converter.convert(arg.get());
+          ByteBuffer byteBuffer = ByteBuffer.wrap(val.getBytes(), 0, val.getLength());
+          result.set(byteBufferTransform.apply(byteBuffer));
+        };
+        break;
+
+      case INT:
+        converter = new PrimitiveObjectInspectorConverter.IntConverter(argumentOI,
+            PrimitiveObjectInspectorFactory.writableIntObjectInspector);
+        Transform<Integer, Integer> intTransform = Transforms.bucket(Types.IntegerType.get(), numBuckets);
+        evaluator = arg -> {
+          IntWritable val = (IntWritable) converter.convert(arg.get());
+          result.set(intTransform.apply(val.get()));
+        };
+        break;
+
+      case LONG:
+        converter = new PrimitiveObjectInspectorConverter.LongConverter(argumentOI,
+            PrimitiveObjectInspectorFactory.writableLongObjectInspector);
+        Transform<Long, Integer> longTransform = Transforms.bucket(Types.LongType.get(), numBuckets);
+        evaluator = arg -> {
+          LongWritable val = (LongWritable) converter.convert(arg.get());
+          result.set(longTransform.apply(val.get()));
+        };
+        break;
+
+      case DECIMAL:
+        DecimalTypeInfo decimalTypeInfo = (DecimalTypeInfo) TypeInfoUtils.getTypeInfoFromObjectInspector(argumentOI);
+        Type.PrimitiveType decimalIcebergType = Types.DecimalType.of(decimalTypeInfo.getPrecision(),
+            decimalTypeInfo.getScale());
+
+        converter = new PrimitiveObjectInspectorConverter.HiveDecimalConverter(argumentOI,
+            PrimitiveObjectInspectorFactory.writableHiveDecimalObjectInspector);
+        Transform<BigDecimal, Integer> bigDecimalTransform = Transforms.bucket(decimalIcebergType, numBuckets);
+        evaluator = arg -> {
+          HiveDecimalWritable val = (HiveDecimalWritable) converter.convert(arg.get());
+          result.set(bigDecimalTransform.apply(val.getHiveDecimal().bigDecimalValue()));
+        };
+        break;
+
+      case FLOAT:
+        converter = new PrimitiveObjectInspectorConverter.FloatConverter(argumentOI,
+            PrimitiveObjectInspectorFactory.writableFloatObjectInspector);
+        Transform<Float, Integer> floatTransform = Transforms.bucket(Types.FloatType.get(), numBuckets);
+        evaluator = arg -> {
+          FloatWritable val = (FloatWritable) converter.convert(arg.get());
+          result.set(floatTransform.apply(val.get()));
+        };
+        break;
+
+      case DOUBLE:
+        converter = new PrimitiveObjectInspectorConverter.DoubleConverter(argumentOI,
+            PrimitiveObjectInspectorFactory.writableDoubleObjectInspector);
+        Transform<Double, Integer> doubleTransform = Transforms.bucket(Types.DoubleType.get(), numBuckets);
+        evaluator = arg -> {
+          DoubleWritable val = (DoubleWritable) converter.convert(arg.get());
+          result.set(doubleTransform.apply(val.get()));
+        };
+        break;
+
+      default:
+        throw new UDFArgumentException(
+            " ICEBERG_BUCKET() only takes STRING/CHAR/VARCHAR/BINARY/INT/LONG/DECIMAL/FLOAT/DOUBLE" +
+                " types as first argument, got " + inputType);
+    }
+
+    outputOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+    return outputOI;
+  }
+
+  private static int getNumBuckets(ObjectInspector arg) throws UDFArgumentException {
+    UDFArgumentException udfArgumentException = new UDFArgumentException("ICEBERG_BUCKET() second argument can only " +
+        "take an int type, but got " + arg.getTypeName());
+    if (arg.getCategory() != ObjectInspector.Category.PRIMITIVE) {
+      throw udfArgumentException;
+    }
+    PrimitiveObjectInspector.PrimitiveCategory inputType = ((PrimitiveObjectInspector) arg).getPrimitiveCategory();
+    if (inputType != PrimitiveObjectInspector.PrimitiveCategory.INT) {
+      throw udfArgumentException;
+    }
+    return ((WritableConstantIntObjectInspector) arg).getWritableConstantValue().get();
+  }
+
+  @Override
+  public Object evaluate(DeferredObject[] arguments) throws HiveException {
+
+    DeferredObject argument = arguments[0];
+    if (argument == null) {
+      return null;
+    } else {
+      evaluator.apply(argument);
+    }
+
+    return result;
+  }
+
+  @Override
+  public String getDisplayString(String[] children) {
+    return getStandardDisplayString("iceberg_bucket", children);
+  }
+}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
index dc799cc..1ba824e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSerDe.java
@@ -28,6 +28,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import javax.annotation.Nullable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
@@ -146,6 +147,12 @@ public class HiveIcebergSerDe extends AbstractSerDe {
       }
     }
 
+    // Currently ClusteredWriter is used which requires that records are ordered by partition keys.
+    // Here we ensure that SortedDynPartitionOptimizer will kick in and do the sorting.
+    // TODO: remove once we have both Fanout and ClusteredWriter available: HIVE-25948
+    HiveConf.setIntVar(configuration, HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITIONTHRESHOLD, 1);
+    HiveConf.setVar(configuration, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+
     try {
       this.inspector = IcebergObjectInspector.create(projectedSchema);
     } catch (Exception e) {
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 2dd92a3..a6777d3 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -31,6 +30,8 @@ import java.util.ListIterator;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.function.BiFunction;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -49,6 +51,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
 import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -63,6 +66,7 @@ import org.apache.hadoop.hive.ql.stats.Partish;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
@@ -85,7 +89,9 @@ import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTest
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Splitter;
 import org.apache.iceberg.relocated.com.google.common.base.Throwables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.SerializationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,6 +102,25 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
   private static final String ICEBERG_URI_PREFIX = "iceberg://";
   private static final Splitter TABLE_NAME_SPLITTER = Splitter.on("..");
   private static final String TABLE_NAME_SEPARATOR = "..";
+  /**
+   * Function template for producing a custom sort expression function:
+   * Takes the source column index and the bucket count to creat a function where Iceberg bucket UDF is used to build
+   * the sort expression, e.g. iceberg_bucket(_col2, 5)
+   */
+  private static final transient BiFunction<Integer, Integer, Function<List<ExprNodeDesc>, ExprNodeDesc>>
+      BUCKET_SORT_EXPR =
+          (idx, bucket) -> cols -> {
+            try {
+              ExprNodeDesc icebergBucketSourceCol = cols.get(idx);
+              return ExprNodeGenericFuncDesc.newInstance(new GenericUDFIcebergBucket(), "iceberg_bucket",
+                  Lists.newArrayList(
+                      icebergBucketSourceCol,
+                      new ExprNodeConstantDesc(TypeInfoFactory.intTypeInfo, bucket)
+                  ));
+            } catch (UDFArgumentException e) {
+              throw new RuntimeException(e);
+            }
+          };
 
   static final String WRITE_KEY = "HiveIcebergStorageHandler_write";
 
@@ -284,7 +309,6 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
 
   @Override
   public List<PartitionTransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
-    List<PartitionTransformSpec> result = new ArrayList<>();
     TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
     Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
     return table.spec().fields().stream().map(f -> {
@@ -308,6 +332,42 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
   }
 
   @Override
+  public DynamicPartitionCtx createDPContext(HiveConf hiveConf, org.apache.hadoop.hive.ql.metadata.Table hmsTable)
+      throws SemanticException {
+    TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
+    Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
+    if (table.spec().isUnpartitioned()) {
+      return null;
+    }
+
+    // Iceberg currently doesn't have publicly accessible partition transform information, hence use above string parse
+    List<PartitionTransformSpec> partitionTransformSpecs = getPartitionTransformSpec(hmsTable);
+
+    DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(Maps.newLinkedHashMap(),
+        hiveConf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
+        hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
+    List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs = Lists.newLinkedList();
+    dpCtx.setCustomSortExpressions(customSortExprs);
+
+    Map<String, Integer> fieldOrderMap = Maps.newHashMap();
+    List<Types.NestedField> fields = table.schema().columns();
+    for (int i = 0; i < fields.size(); ++i) {
+      fieldOrderMap.put(fields.get(i).name(), i);
+    }
+
+    for (PartitionTransformSpec spec : partitionTransformSpecs) {
+      int order = fieldOrderMap.get(spec.getColumnName());
+      if (PartitionTransformSpec.TransformType.BUCKET.equals(spec.getTransformType())) {
+        customSortExprs.add(BUCKET_SORT_EXPR.apply(order, spec.getTransformParam().get()));
+      } else {
+        customSortExprs.add(cols -> cols.get(order).clone());
+      }
+    }
+
+    return dpCtx;
+  }
+
+  @Override
   public String getFileFormatPropertyKey() {
     return TableProperties.DEFAULT_FILE_FORMAT;
   }
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
index b50bf4e..03784ca 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
@@ -253,8 +253,8 @@ public class HiveIcebergTestUtils {
     List<Record> sortedExpected = new ArrayList<>(expected);
     List<Record> sortedActual = new ArrayList<>(actual);
     // Sort based on the specified column
-    sortedExpected.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy)));
-    sortedActual.sort(Comparator.comparingLong(record -> (Long) record.get(sortBy)));
+    sortedExpected.sort(Comparator.comparingInt(record -> record.get(sortBy).hashCode()));
+    sortedActual.sort(Comparator.comparingInt(record -> record.get(sortBy).hashCode()));
 
     Assert.assertEquals(sortedExpected.size(), sortedActual.size());
     for (int i = 0; i < sortedExpected.size(); ++i) {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index 24c43b3..2e3f5aa 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -77,7 +77,7 @@ public class TestHiveIcebergOutputCommitter {
   );
 
   private static final PartitionSpec PARTITIONED_SPEC =
-      PartitionSpec.builderFor(CUSTOMER_SCHEMA).bucket("customer_id", 3).build();
+      PartitionSpec.builderFor(CUSTOMER_SCHEMA).identity("customer_id").build();
 
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
@@ -123,8 +123,7 @@ public class TestHiveIcebergOutputCommitter {
     List<Record> expected = writeRecords(table.name(), 1, 0, true, false, conf);
     committer.commitJob(new JobContextImpl(conf, JOB_ID));
 
-    // Expecting 3 files with fanout-, 4 with ClusteredWriter where writing to already completed partitions is allowed.
-    HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 4);
+    HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 2);
     HiveIcebergTestUtils.validateData(table, expected, 0);
   }
 
@@ -137,7 +136,7 @@ public class TestHiveIcebergOutputCommitter {
     committer.commitJob(new JobContextImpl(conf, JOB_ID));
 
     HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 2);
-    HiveIcebergTestUtils.validateData(table, expected, 0);
+    HiveIcebergTestUtils.validateData(table, expected, 1);
   }
 
   @Test
@@ -148,9 +147,8 @@ public class TestHiveIcebergOutputCommitter {
     List<Record> expected = writeRecords(table.name(), 2, 0, true, false, conf);
     committer.commitJob(new JobContextImpl(conf, JOB_ID));
 
-    // Expecting 6 files with fanout-, 8 with ClusteredWriter where writing to already completed partitions is allowed.
-    HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 8);
-    HiveIcebergTestUtils.validateData(table, expected, 0);
+    HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 4);
+    HiveIcebergTestUtils.validateData(table, expected, 1);
   }
 
   @Test
@@ -174,7 +172,7 @@ public class TestHiveIcebergOutputCommitter {
     List<Record> expected = writeRecords(table.name(), 2, 2, true, false, conf);
     committer.commitJob(new JobContextImpl(conf, JOB_ID));
     HiveIcebergTestUtils.validateFiles(table, conf, JOB_ID, 4);
-    HiveIcebergTestUtils.validateData(table, expected, 0);
+    HiveIcebergTestUtils.validateData(table, expected, 1);
   }
 
   @Test
@@ -270,6 +268,10 @@ public class TestHiveIcebergOutputCommitter {
 
     for (int i = 0; i < taskNum; ++i) {
       List<Record> records = TestHelper.generateRandomRecords(schema, RECORD_NUM, i + attemptNum);
+      // making customer_id deterministic for result comparisons
+      for (int j = 0; j < RECORD_NUM; ++j) {
+        records.get(j).setField("customer_id", j / 3L);
+      }
       TaskAttemptID taskId = new TaskAttemptID(JOB_ID.getJtIdentifier(), JOB_ID.getId(), TaskType.MAP, i, attemptNum);
       int partitionId = taskId.getTaskID().getId();
       String operationId = QUERY_ID + "-" + JOB_ID;
diff --git a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q
index 8ea1f12..9309452 100644
--- a/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q
+++ b/iceberg/iceberg-handler/src/test/queries/positive/dynamic_partition_writes.q
@@ -1,21 +1,45 @@
+-- Mask the file size values as it can have slight variability, causing test flakiness
+--! qt:replace:/("file_size_in_bytes":)\d+/$1#Masked#/
+--! qt:replace:/("total-files-size":)\d+/$1#Masked#/
+--! qt:replace:/((ORC|PARQUET|AVRO)\s+\d+\s+)\d+/$1#Masked#/
+
 drop table if exists tbl_src;
 drop table if exists tbl_target_identity;
 drop table if exists tbl_target_bucket;
+drop table if exists tbl_target_mixed;
 
 
-create external table tbl_src (a int, b string) stored by iceberg stored as orc;
-insert into tbl_src values (1, 'EUR'), (2, 'EUR'), (3, 'USD'), (4, 'EUR'), (5, 'HUF'), (6, 'USD'), (7, 'USD'), (8, 'PLN'), (9, 'PLN'), (10, 'CZK');
+create external table tbl_src (a int, b string, c bigint) stored by iceberg stored as orc;
+insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, 'EUR', 12), (5, 'HUF', 30), (6, 'USD', 10), (7, 'USD', 100), (8, 'PLN', 20), (9, 'PLN', 11), (10, 'CZK', 5);
 --need at least 2 files to ensure ClusteredWriter encounters out-of-order records
-insert into tbl_src values (10, 'EUR'), (20, 'EUR'), (30, 'USD'), (40, 'EUR'), (50, 'HUF'), (60, 'USD'), (70, 'USD'), (80, 'PLN'), (90, 'PLN'), (100, 'CZK');
+insert into tbl_src values (10, 'EUR', 12), (20, 'EUR', 11), (30, 'USD', 100), (40, 'EUR', 10), (50, 'HUF', 30), (60, 'USD', 12), (70, 'USD', 20), (80, 'PLN', 100), (90, 'PLN', 18), (100, 'CZK', 12);
 
 create external table tbl_target_identity (a int) partitioned by (ccy string) stored by iceberg stored as orc;
-explain insert overwrite table tbl_target_identity select * from tbl_src;
-insert overwrite table tbl_target_identity select * from tbl_src;
-select * from tbl_target_identity order by a;
+explain insert overwrite table tbl_target_identity select a, b from tbl_src;
+insert overwrite table tbl_target_identity select a, b from tbl_src;
+select * from tbl_target_identity order by a, ccy;
 
---bucketed case - although SortedDynPartitionOptimizer kicks in for this case too, its work is futile as it sorts values rather than the computed buckets
---thus we need this case to check that ClusteredWriter allows out-of-order records for bucket partition spec (only)
+--bucketed case - should invoke GenericUDFIcebergBucket to calculate buckets before sorting
 create external table tbl_target_bucket (a int, ccy string) partitioned by spec (bucket (2, ccy)) stored by iceberg stored as orc;
-explain insert into table tbl_target_bucket select * from tbl_src;
-insert into table tbl_target_bucket select * from tbl_src;
-select * from tbl_target_bucket order by a;
\ No newline at end of file
+explain insert into table tbl_target_bucket select a, b from tbl_src;
+insert into table tbl_target_bucket select a, b from tbl_src;
+select * from tbl_target_bucket order by a, ccy;
+
+--mixed case - 1 identity + 1 bucket cols
+create external table tbl_target_mixed (a int, ccy string, c bigint) partitioned by spec (ccy, bucket (3, c)) stored by iceberg stored as orc;
+explain insert into table tbl_target_mixed select * from tbl_src;
+insert into table tbl_target_mixed select * from tbl_src;
+select * from tbl_target_mixed order by a, ccy;
+select * from default.tbl_target_mixed.partitions;
+select * from default.tbl_target_mixed.files;
+
+--1 of 2 partition cols is folded with constant - should still sort
+explain insert into table tbl_target_mixed select * from tbl_src where b = 'EUR';
+insert into table tbl_target_mixed select * from tbl_src where b = 'EUR';
+
+--all partitions cols folded - should not sort as it's not needed
+explain insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100;
+insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100;
+
+select * from tbl_target_mixed order by a, ccy;
+select * from default.tbl_target_mixed.files;
diff --git a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out
index 656e5ba..91b3808 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/dynamic_partition_writes.q.out
@@ -10,27 +10,31 @@ PREHOOK: query: drop table if exists tbl_target_bucket
 PREHOOK: type: DROPTABLE
 POSTHOOK: query: drop table if exists tbl_target_bucket
 POSTHOOK: type: DROPTABLE
-PREHOOK: query: create external table tbl_src (a int, b string) stored by iceberg stored as orc
+PREHOOK: query: drop table if exists tbl_target_mixed
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists tbl_target_mixed
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create external table tbl_src (a int, b string, c bigint) stored by iceberg stored as orc
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
 PREHOOK: Output: default@tbl_src
-POSTHOOK: query: create external table tbl_src (a int, b string) stored by iceberg stored as orc
+POSTHOOK: query: create external table tbl_src (a int, b string, c bigint) stored by iceberg stored as orc
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@tbl_src
-PREHOOK: query: insert into tbl_src values (1, 'EUR'), (2, 'EUR'), (3, 'USD'), (4, 'EUR'), (5, 'HUF'), (6, 'USD'), (7, 'USD'), (8, 'PLN'), (9, 'PLN'), (10, 'CZK')
+PREHOOK: query: insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, 'EUR', 12), (5, 'HUF', 30), (6, 'USD', 10), (7, 'USD', 100), (8, 'PLN', 20), (9, 'PLN', 11), (10, 'CZK', 5)
 PREHOOK: type: QUERY
 PREHOOK: Input: _dummy_database@_dummy_table
 PREHOOK: Output: default@tbl_src
-POSTHOOK: query: insert into tbl_src values (1, 'EUR'), (2, 'EUR'), (3, 'USD'), (4, 'EUR'), (5, 'HUF'), (6, 'USD'), (7, 'USD'), (8, 'PLN'), (9, 'PLN'), (10, 'CZK')
+POSTHOOK: query: insert into tbl_src values (1, 'EUR', 10), (2, 'EUR', 10), (3, 'USD', 11), (4, 'EUR', 12), (5, 'HUF', 30), (6, 'USD', 10), (7, 'USD', 100), (8, 'PLN', 20), (9, 'PLN', 11), (10, 'CZK', 5)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_src
-PREHOOK: query: insert into tbl_src values (10, 'EUR'), (20, 'EUR'), (30, 'USD'), (40, 'EUR'), (50, 'HUF'), (60, 'USD'), (70, 'USD'), (80, 'PLN'), (90, 'PLN'), (100, 'CZK')
+PREHOOK: query: insert into tbl_src values (10, 'EUR', 12), (20, 'EUR', 11), (30, 'USD', 100), (40, 'EUR', 10), (50, 'HUF', 30), (60, 'USD', 12), (70, 'USD', 20), (80, 'PLN', 100), (90, 'PLN', 18), (100, 'CZK', 12)
 PREHOOK: type: QUERY
 PREHOOK: Input: _dummy_database@_dummy_table
 PREHOOK: Output: default@tbl_src
-POSTHOOK: query: insert into tbl_src values (10, 'EUR'), (20, 'EUR'), (30, 'USD'), (40, 'EUR'), (50, 'HUF'), (60, 'USD'), (70, 'USD'), (80, 'PLN'), (90, 'PLN'), (100, 'CZK')
+POSTHOOK: query: insert into tbl_src values (10, 'EUR', 12), (20, 'EUR', 11), (30, 'USD', 100), (40, 'EUR', 10), (50, 'HUF', 30), (60, 'USD', 12), (70, 'USD', 20), (80, 'PLN', 100), (90, 'PLN', 18), (100, 'CZK', 12)
 POSTHOOK: type: QUERY
 POSTHOOK: Input: _dummy_database@_dummy_table
 POSTHOOK: Output: default@tbl_src
@@ -42,11 +46,11 @@ POSTHOOK: query: create external table tbl_target_identity (a int) partitioned b
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@tbl_target_identity
-PREHOOK: query: explain insert overwrite table tbl_target_identity select * from tbl_src
+PREHOOK: query: explain insert overwrite table tbl_target_identity select a, b from tbl_src
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_src
 PREHOOK: Output: default@tbl_target_identity
-POSTHOOK: query: explain insert overwrite table tbl_target_identity select * from tbl_src
+POSTHOOK: query: explain insert overwrite table tbl_target_identity select a, b from tbl_src
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_src
 POSTHOOK: Output: default@tbl_target_identity
@@ -68,10 +72,9 @@ Stage-3
               File Output Operator [FS_18]
                 table:{"name:":"default.tbl_target_identity"}
                 Select Operator [SEL_17]
-                  Output:["_col0","_col1"]
+                  Output:["_col0","_col1","_col1"]
                 <-Map 1 [SIMPLE_EDGE] vectorized
                   PARTITION_ONLY_SHUFFLE [RS_13]
-                    PartitionCols:_col1
                     Select Operator [SEL_12] (rows=20 width=91)
                       Output:["_col0","_col1"]
                       TableScan [TS_0] (rows=20 width=91)
@@ -90,19 +93,19 @@ Stage-3
                           Output:["a","ccy"]
                            Please refer to the previous Select Operator [SEL_12]
 
-PREHOOK: query: insert overwrite table tbl_target_identity select * from tbl_src
+PREHOOK: query: insert overwrite table tbl_target_identity select a, b from tbl_src
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_src
 PREHOOK: Output: default@tbl_target_identity
-POSTHOOK: query: insert overwrite table tbl_target_identity select * from tbl_src
+POSTHOOK: query: insert overwrite table tbl_target_identity select a, b from tbl_src
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_src
 POSTHOOK: Output: default@tbl_target_identity
-PREHOOK: query: select * from tbl_target_identity order by a
+PREHOOK: query: select * from tbl_target_identity order by a, ccy
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_target_identity
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_target_identity order by a
+POSTHOOK: query: select * from tbl_target_identity order by a, ccy
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_target_identity
 POSTHOOK: Output: hdfs://### HDFS PATH ###
@@ -115,8 +118,8 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
 7	USD
 8	PLN
 9	PLN
-10	EUR
 10	CZK
+10	EUR
 20	EUR
 30	USD
 40	EUR
@@ -134,11 +137,11 @@ POSTHOOK: query: create external table tbl_target_bucket (a int, ccy string) par
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
 POSTHOOK: Output: default@tbl_target_bucket
-PREHOOK: query: explain insert into table tbl_target_bucket select * from tbl_src
+PREHOOK: query: explain insert into table tbl_target_bucket select a, b from tbl_src
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_src
 PREHOOK: Output: default@tbl_target_bucket
-POSTHOOK: query: explain insert into table tbl_target_bucket select * from tbl_src
+POSTHOOK: query: explain insert into table tbl_target_bucket select a, b from tbl_src
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_src
 POSTHOOK: Output: default@tbl_target_bucket
@@ -160,10 +163,9 @@ Stage-3
               File Output Operator [FS_18]
                 table:{"name:":"default.tbl_target_bucket"}
                 Select Operator [SEL_17]
-                  Output:["_col0","_col1"]
+                  Output:["_col0","_col1","iceberg_bucket(_col1, 2)"]
                 <-Map 1 [SIMPLE_EDGE] vectorized
                   PARTITION_ONLY_SHUFFLE [RS_13]
-                    PartitionCols:_col1
                     Select Operator [SEL_12] (rows=20 width=91)
                       Output:["_col0","_col1"]
                       TableScan [TS_0] (rows=20 width=91)
@@ -182,19 +184,19 @@ Stage-3
                           Output:["a","ccy"]
                            Please refer to the previous Select Operator [SEL_12]
 
-PREHOOK: query: insert into table tbl_target_bucket select * from tbl_src
+PREHOOK: query: insert into table tbl_target_bucket select a, b from tbl_src
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_src
 PREHOOK: Output: default@tbl_target_bucket
-POSTHOOK: query: insert into table tbl_target_bucket select * from tbl_src
+POSTHOOK: query: insert into table tbl_target_bucket select a, b from tbl_src
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_src
 POSTHOOK: Output: default@tbl_target_bucket
-PREHOOK: query: select * from tbl_target_bucket order by a
+PREHOOK: query: select * from tbl_target_bucket order by a, ccy
 PREHOOK: type: QUERY
 PREHOOK: Input: default@tbl_target_bucket
 PREHOOK: Output: hdfs://### HDFS PATH ###
-POSTHOOK: query: select * from tbl_target_bucket order by a
+POSTHOOK: query: select * from tbl_target_bucket order by a, ccy
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@tbl_target_bucket
 POSTHOOK: Output: hdfs://### HDFS PATH ###
@@ -207,8 +209,8 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
 7	USD
 8	PLN
 9	PLN
-10	EUR
 10	CZK
+10	EUR
 20	EUR
 30	USD
 40	EUR
@@ -218,3 +220,319 @@ POSTHOOK: Output: hdfs://### HDFS PATH ###
 80	PLN
 90	PLN
 100	CZK
+PREHOOK: query: create external table tbl_target_mixed (a int, ccy string, c bigint) partitioned by spec (ccy, bucket (3, c)) stored by iceberg stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@tbl_target_mixed
+POSTHOOK: query: create external table tbl_target_mixed (a int, ccy string, c bigint) partitioned by spec (ccy, bucket (3, c)) stored by iceberg stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@tbl_target_mixed
+PREHOOK: query: explain insert into table tbl_target_mixed select * from tbl_src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_src
+PREHOOK: Output: default@tbl_target_mixed
+POSTHOOK: query: explain insert into table tbl_target_mixed select * from tbl_src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_src
+POSTHOOK: Output: default@tbl_target_mixed
+Plan optimized by CBO.
+
+Vertex dependency in root stage
+Reducer 2 <- Map 1 (SIMPLE_EDGE)
+Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE)
+
+Stage-3
+  Stats Work{}
+    Stage-0
+      Move Operator
+        table:{"name:":"default.tbl_target_mixed"}
+        Stage-2
+          Dependency Collection{}
+            Stage-1
+              Reducer 2 vectorized
+              File Output Operator [FS_18]
+                table:{"name:":"default.tbl_target_mixed"}
+                Select Operator [SEL_17]
+                  Output:["_col0","_col1","_col2","_col1","iceberg_bucket(_col2, 3)"]
+                <-Map 1 [SIMPLE_EDGE] vectorized
+                  PARTITION_ONLY_SHUFFLE [RS_13]
+                    Select Operator [SEL_12] (rows=20 width=99)
+                      Output:["_col0","_col1","_col2"]
+                      TableScan [TS_0] (rows=20 width=99)
+                        default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"]
+              Reducer 3 vectorized
+              File Output Operator [FS_21]
+                Select Operator [SEL_20] (rows=1 width=794)
+                  Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17"]
+                  Group By Operator [GBY_19] (rows=1 width=500)
+                    Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)","min(VALUE._col9)","max(VALUE._col10)","count(VALUE._col11)","compute_bit_vector_hll(VALUE._col12)"]
+                  <-Map 1 [CUSTOM_SIMPLE_EDGE] vectorized
+                    PARTITION_ONLY_SHUFFLE [RS_16]
+                      Group By Operator [GBY_15] (rows=1 width=568)
+                        Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)","min(c)","max(c)","count(c)","compute_bit_vector_hll(c)"]
+                        Select Operator [SEL_14] (rows=20 width=99)
+                          Output:["a","ccy","c"]
+                           Please refer to the previous Select Operator [SEL_12]
+
+PREHOOK: query: insert into table tbl_target_mixed select * from tbl_src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_src
+PREHOOK: Output: default@tbl_target_mixed
+POSTHOOK: query: insert into table tbl_target_mixed select * from tbl_src
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_src
+POSTHOOK: Output: default@tbl_target_mixed
+PREHOOK: query: select * from tbl_target_mixed order by a, ccy
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_target_mixed
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from tbl_target_mixed order by a, ccy
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_target_mixed
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	EUR	10
+2	EUR	10
+3	USD	11
+4	EUR	12
+5	HUF	30
+6	USD	10
+7	USD	100
+8	PLN	20
+9	PLN	11
+10	CZK	5
+10	EUR	12
+20	EUR	11
+30	USD	100
+40	EUR	10
+50	HUF	30
+60	USD	12
+70	USD	20
+80	PLN	100
+90	PLN	18
+100	CZK	12
+PREHOOK: query: select * from default.tbl_target_mixed.partitions
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_target_mixed
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from default.tbl_target_mixed.partitions
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_target_mixed
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+{"ccy":"EUR","c_bucket":0}	1	1
+{"ccy":"EUR","c_bucket":1}	2	1
+{"ccy":"HUF","c_bucket":1}	2	1
+{"ccy":"EUR","c_bucket":2}	3	1
+{"ccy":"USD","c_bucket":1}	3	1
+{"ccy":"CZK","c_bucket":1}	1	1
+{"ccy":"USD","c_bucket":0}	2	1
+{"ccy":"USD","c_bucket":2}	1	1
+{"ccy":"CZK","c_bucket":2}	1	1
+{"ccy":"PLN","c_bucket":2}	1	1
+{"ccy":"PLN","c_bucket":0}	2	1
+{"ccy":"PLN","c_bucket":1}	1	1
+PREHOOK: query: select * from default.tbl_target_mixed.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_target_mixed
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from default.tbl_target_mixed.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_target_mixed
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"CZK","c_bucket":1}	1	449	{1:6,2:12,3:6}	{1:1,2:1,3:1}	{1:0,2:0,3:0}	{}	{1:d,2:CZK,3:}	{1:d,2:CZK,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"CZK","c_bucket":2}	1	432	{1:6,2:12,3:6}	{1:1,2:1,3:1}	{1:0,2:0,3:0}	{}	{1:
+,2:CZK,3:}	{1:
+,2:CZK,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"EUR","c_bucket":0}	1	432	{1:6,2:12,3:6}	{1:1,2:1,3:1}	{1:0,2:0,3:0}	{}	{1:,2:EUR,3:}	{1:,2:EUR,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"EUR","c_bucket":1}	2	448	{1:7,2:18,3:7}	{1:2,2:2,3:2}	{1:0,2:0,3:0}	{}	{1:,2:EUR,3:}	{1:
+,2:EUR,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"EUR","c_bucket":2}	3	448	{1:8,2:17,3:5}	{1:3,2:3,3:3}	{1:0,2:0,3:0}	{}	{1:,2:EUR,3:
+}	{1:(,2:EUR,3:
+}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"HUF","c_bucket":1}	2	448	{1:7,2:18,3:7}	{1:2,2:2,3:2}	{1:0,2:0,3:0}	{}	{1:,2:HUF,3:}	{1:2,2:HUF,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"PLN","c_bucket":0}	2	448	{1:7,2:18,3:7}	{1:2,2:2,3:2}	{1:0,2:0,3:0}	{}	{1:,2:PLN,3:}	{1:	,2:PLN,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"PLN","c_bucket":1}	1	448	{1:6,2:12,3:6}	{1:1,2:1,3:1}	{1:0,2:0,3:0}	{}	{1:P,2:PLN,3:d}	{1:P,2:PLN,3:d}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"PLN","c_bucket":2}	1	449	{1:6,2:12,3:6}	{1:1,2:1,3:1}	{1:0,2:0,3:0}	{}	{1:Z,2:PLN,3:}	{1:Z,2:PLN,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"USD","c_bucket":0}	2	462	{1:7,2:18,3:7}	{1:2,2:2,3:2}	{1:0,2:0,3:0}	{}	{1:,2:USD,3:}	{1:F,2:USD,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"USD","c_bucket":1}	3	470	{1:8,2:17,3:8}	{1:3,2:3,3:3}	{1:0,2:0,3:0}	{}	{1:,2:USD,3:}	{1:<,2:USD,3:d}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"USD","c_bucket":2}	1	432	{1:6,2:12,3:6}	{1:1,2:1,3:1}	{1:0,2:0,3:0}	{}	{1:,2:USD,3:
+}	{1:,2:USD,3:
+}	NULL	[3]	NULL	0
+PREHOOK: query: explain insert into table tbl_target_mixed select * from tbl_src where b = 'EUR'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_src
+PREHOOK: Output: default@tbl_target_mixed
+POSTHOOK: query: explain insert into table tbl_target_mixed select * from tbl_src where b = 'EUR'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_src
+POSTHOOK: Output: default@tbl_target_mixed
+Plan optimized by CBO.
+
+Vertex dependency in root stage
+Reducer 2 <- Map 1 (SIMPLE_EDGE)
+Reducer 3 <- Map 1 (CUSTOM_SIMPLE_EDGE)
+
+Stage-3
+  Stats Work{}
+    Stage-0
+      Move Operator
+        table:{"name:":"default.tbl_target_mixed"}
+        Stage-2
+          Dependency Collection{}
+            Stage-1
+              Reducer 2 vectorized
+              File Output Operator [FS_21]
+                table:{"name:":"default.tbl_target_mixed"}
+                Select Operator [SEL_20]
+                  Output:["_col0","_col1","_col2","_col1","iceberg_bucket(_col2, 3)"]
+                <-Map 1 [SIMPLE_EDGE] vectorized
+                  PARTITION_ONLY_SHUFFLE [RS_16]
+                    Select Operator [SEL_15] (rows=4 width=99)
+                      Output:["_col0","_col1","_col2"]
+                      Filter Operator [FIL_14] (rows=4 width=99)
+                        predicate:(b = 'EUR')
+                        TableScan [TS_0] (rows=20 width=99)
+                          default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"]
+              Reducer 3 vectorized
+              File Output Operator [FS_24]
+                Select Operator [SEL_23] (rows=1 width=794)
+                  Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17"]
+                  Group By Operator [GBY_22] (rows=1 width=500)
+                    Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)","min(VALUE._col9)","max(VALUE._col10)","count(VALUE._col11)","compute_bit_vector_hll(VALUE._col12)"]
+                  <-Map 1 [CUSTOM_SIMPLE_EDGE] vectorized
+                    PARTITION_ONLY_SHUFFLE [RS_19]
+                      Group By Operator [GBY_18] (rows=1 width=568)
+                        Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)","min(c)","max(c)","count(c)","compute_bit_vector_hll(c)"]
+                        Select Operator [SEL_17] (rows=4 width=99)
+                          Output:["a","ccy","c"]
+                           Please refer to the previous Select Operator [SEL_15]
+
+PREHOOK: query: insert into table tbl_target_mixed select * from tbl_src where b = 'EUR'
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_src
+PREHOOK: Output: default@tbl_target_mixed
+POSTHOOK: query: insert into table tbl_target_mixed select * from tbl_src where b = 'EUR'
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_src
+POSTHOOK: Output: default@tbl_target_mixed
+PREHOOK: query: explain insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_src
+PREHOOK: Output: default@tbl_target_mixed
+POSTHOOK: query: explain insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_src
+POSTHOOK: Output: default@tbl_target_mixed
+Plan optimized by CBO.
+
+Vertex dependency in root stage
+Reducer 2 <- Map 1 (CUSTOM_SIMPLE_EDGE)
+
+Stage-3
+  Stats Work{}
+    Stage-0
+      Move Operator
+        table:{"name:":"default.tbl_target_mixed"}
+        Stage-2
+          Dependency Collection{}
+            Stage-1
+              Reducer 2 vectorized
+              File Output Operator [FS_20]
+                Select Operator [SEL_19] (rows=1 width=794)
+                  Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12","_col13","_col14","_col15","_col16","_col17"]
+                  Group By Operator [GBY_18] (rows=1 width=500)
+                    Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"],aggregations:["min(VALUE._col0)","max(VALUE._col1)","count(VALUE._col2)","count(VALUE._col3)","compute_bit_vector_hll(VALUE._col4)","max(VALUE._col5)","avg(VALUE._col6)","count(VALUE._col7)","compute_bit_vector_hll(VALUE._col8)","min(VALUE._col9)","max(VALUE._col10)","count(VALUE._col11)","compute_bit_vector_hll(VALUE._col12)"]
+                  <-Map 1 [CUSTOM_SIMPLE_EDGE] vectorized
+                    File Output Operator [FS_14]
+                      table:{"name:":"default.tbl_target_mixed"}
+                      Select Operator [SEL_13] (rows=1 width=99)
+                        Output:["_col0","_col1","_col2"]
+                        Filter Operator [FIL_12] (rows=1 width=99)
+                          predicate:((c = 100L) and (b = 'USD'))
+                          TableScan [TS_0] (rows=20 width=99)
+                            default@tbl_src,tbl_src,Tbl:COMPLETE,Col:COMPLETE,Output:["a","b","c"]
+                    PARTITION_ONLY_SHUFFLE [RS_17]
+                      Group By Operator [GBY_16] (rows=1 width=568)
+                        Output:["_col0","_col1","_col2","_col3","_col4","_col5","_col6","_col7","_col8","_col9","_col10","_col11","_col12"],aggregations:["min(a)","max(a)","count(1)","count(a)","compute_bit_vector_hll(a)","max(length(ccy))","avg(COALESCE(length(ccy),0))","count(ccy)","compute_bit_vector_hll(ccy)","min(c)","max(c)","count(c)","compute_bit_vector_hll(c)"]
+                        Select Operator [SEL_15] (rows=1 width=99)
+                          Output:["a","ccy","c"]
+                           Please refer to the previous Select Operator [SEL_13]
+
+PREHOOK: query: insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_src
+PREHOOK: Output: default@tbl_target_mixed
+POSTHOOK: query: insert into table tbl_target_mixed select * from tbl_src where b = 'USD' and c = 100
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_src
+POSTHOOK: Output: default@tbl_target_mixed
+PREHOOK: query: select * from tbl_target_mixed order by a, ccy
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_target_mixed
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from tbl_target_mixed order by a, ccy
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_target_mixed
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+1	EUR	10
+1	EUR	10
+2	EUR	10
+2	EUR	10
+3	USD	11
+4	EUR	12
+4	EUR	12
+5	HUF	30
+6	USD	10
+7	USD	100
+7	USD	100
+8	PLN	20
+9	PLN	11
+10	CZK	5
+10	EUR	12
+10	EUR	12
+20	EUR	11
+20	EUR	11
+30	USD	100
+30	USD	100
+40	EUR	10
+40	EUR	10
+50	HUF	30
+60	USD	12
+70	USD	20
+80	PLN	100
+90	PLN	18
+100	CZK	12
+PREHOOK: query: select * from default.tbl_target_mixed.files
+PREHOOK: type: QUERY
+PREHOOK: Input: default@tbl_target_mixed
+PREHOOK: Output: hdfs://### HDFS PATH ###
+POSTHOOK: query: select * from default.tbl_target_mixed.files
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@tbl_target_mixed
+POSTHOOK: Output: hdfs://### HDFS PATH ###
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"USD","c_bucket":1}	2	466	{1:7,2:18,3:7}	{1:2,2:2,3:2}	{1:0,2:0,3:0}	{}	{1:,2:USD,3:d}	{1:,2:USD,3:d}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"EUR","c_bucket":0}	1	432	{1:6,2:12,3:6}	{1:1,2:1,3:1}	{1:0,2:0,3:0}	{}	{1:,2:EUR,3:}	{1:,2:EUR,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"EUR","c_bucket":1}	2	448	{1:7,2:18,3:7}	{1:2,2:2,3:2}	{1:0,2:0,3:0}	{}	{1:,2:EUR,3:}	{1:
+,2:EUR,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"EUR","c_bucket":2}	3	448	{1:8,2:17,3:5}	{1:3,2:3,3:3}	{1:0,2:0,3:0}	{}	{1:,2:EUR,3:
+}	{1:(,2:EUR,3:
+}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"CZK","c_bucket":1}	1	449	{1:6,2:12,3:6}	{1:1,2:1,3:1}	{1:0,2:0,3:0}	{}	{1:d,2:CZK,3:}	{1:d,2:CZK,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"CZK","c_bucket":2}	1	432	{1:6,2:12,3:6}	{1:1,2:1,3:1}	{1:0,2:0,3:0}	{}	{1:
+,2:CZK,3:}	{1:
+,2:CZK,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"EUR","c_bucket":0}	1	432	{1:6,2:12,3:6}	{1:1,2:1,3:1}	{1:0,2:0,3:0}	{}	{1:,2:EUR,3:}	{1:,2:EUR,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"EUR","c_bucket":1}	2	448	{1:7,2:18,3:7}	{1:2,2:2,3:2}	{1:0,2:0,3:0}	{}	{1:,2:EUR,3:}	{1:
+,2:EUR,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"EUR","c_bucket":2}	3	448	{1:8,2:17,3:5}	{1:3,2:3,3:3}	{1:0,2:0,3:0}	{}	{1:,2:EUR,3:
+}	{1:(,2:EUR,3:
+}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"HUF","c_bucket":1}	2	448	{1:7,2:18,3:7}	{1:2,2:2,3:2}	{1:0,2:0,3:0}	{}	{1:,2:HUF,3:}	{1:2,2:HUF,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"PLN","c_bucket":0}	2	448	{1:7,2:18,3:7}	{1:2,2:2,3:2}	{1:0,2:0,3:0}	{}	{1:,2:PLN,3:}	{1:	,2:PLN,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"PLN","c_bucket":1}	1	448	{1:6,2:12,3:6}	{1:1,2:1,3:1}	{1:0,2:0,3:0}	{}	{1:P,2:PLN,3:d}	{1:P,2:PLN,3:d}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"PLN","c_bucket":2}	1	449	{1:6,2:12,3:6}	{1:1,2:1,3:1}	{1:0,2:0,3:0}	{}	{1:Z,2:PLN,3:}	{1:Z,2:PLN,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"USD","c_bucket":0}	2	462	{1:7,2:18,3:7}	{1:2,2:2,3:2}	{1:0,2:0,3:0}	{}	{1:,2:USD,3:}	{1:F,2:USD,3:}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"USD","c_bucket":1}	3	470	{1:8,2:17,3:8}	{1:3,2:3,3:3}	{1:0,2:0,3:0}	{}	{1:,2:USD,3:}	{1:<,2:USD,3:d}	NULL	[3]	NULL	0
+0	hdfs://### HDFS PATH ###	ORC	0	{"ccy":"USD","c_bucket":2}	1	432	{1:6,2:12,3:6}	{1:1,2:1,3:1}	{1:0,2:0,3:0}	{}	{1:,2:USD,3:
+}	{1:,2:USD,3:
+}	NULL	[3]	NULL	0
diff --git a/iceberg/patched-iceberg-core/pom.xml b/iceberg/patched-iceberg-core/pom.xml
index 9e29bb2..bf49c73 100644
--- a/iceberg/patched-iceberg-core/pom.xml
+++ b/iceberg/patched-iceberg-core/pom.xml
@@ -75,8 +75,7 @@
                   <overWrite>true</overWrite>
                   <outputDirectory>${project.build.directory}/classes</outputDirectory>
                   <excludes>
-                                        **/ClusteredWriter.class
-                                    </excludes>
+                  </excludes>
                 </artifactItem>
               </artifactItems>
             </configuration>
diff --git a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java b/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
deleted file mode 100644
index 8211c96..0000000
--- a/iceberg/patched-iceberg-core/src/main/java/org/apache/iceberg/io/ClusteredWriter.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.io;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.Comparator;
-import java.util.Set;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.encryption.EncryptedOutputFile;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.Comparators;
-import org.apache.iceberg.types.Types.StructType;
-import org.apache.iceberg.util.StructLikeSet;
-
-/**
- * Copied over from Iceberg 0.13.0 and has a difference of allowing bucket transform partitions to arrive without
- * any ordering. This is a temporary convenience solution until a permanent one probably using UDFs is done.
- *
- * Original description:
- *
- * A writer capable of writing to multiple specs and partitions that requires the incoming records
- * to be clustered by partition spec and by partition within each spec.
- * <p>
- * As opposed to {@link FanoutWriter}, this writer keeps at most one file open to reduce
- * the memory consumption. Prefer using this writer whenever the incoming records can be clustered
- * by spec/partition.
- */
-abstract class ClusteredWriter<T, R> implements PartitioningWriter<T, R> {
-
-  private static final Class<?> BUCKET_TRANSFORM_CLAZZ;
-
-  static {
-    try {
-      BUCKET_TRANSFORM_CLAZZ = Class.forName("org.apache.iceberg.transforms.Bucket");
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException("Couldn't find Bucket transform class", e);
-    }
-  }
-
-  private static final String NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE =
-      "Incoming records violate the writer assumption that records are clustered by spec and " +
-          "by partition within each spec. Either cluster the incoming records or switch to fanout writers.\n" +
-          "Encountered records that belong to already closed files:\n";
-
-  private final Set<Integer> completedSpecIds = Sets.newHashSet();
-
-  private PartitionSpec currentSpec = null;
-  private Comparator<StructLike> partitionComparator = null;
-  private Set<StructLike> completedPartitions = null;
-  private StructLike currentPartition = null;
-  private FileWriter<T, R> currentWriter = null;
-
-  private boolean closed = false;
-
-  protected abstract FileWriter<T, R> newWriter(PartitionSpec spec, StructLike partition);
-
-  protected abstract void addResult(R result);
-
-  protected abstract R aggregatedResult();
-
-  @Override
-  public void write(T row, PartitionSpec spec, StructLike partition) {
-    if (!spec.equals(currentSpec)) {
-      if (currentSpec != null) {
-        closeCurrentWriter();
-        completedSpecIds.add(currentSpec.specId());
-        completedPartitions.clear();
-      }
-
-      if (completedSpecIds.contains(spec.specId())) {
-        String errorCtx = String.format("spec %s", spec);
-        throw new IllegalStateException(NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE + errorCtx);
-      }
-
-      StructType partitionType = spec.partitionType();
-
-      this.currentSpec = spec;
-      this.partitionComparator = Comparators.forType(partitionType);
-      this.completedPartitions = StructLikeSet.create(partitionType);
-      // copy the partition key as the key object may be reused
-      this.currentPartition = StructCopy.copy(partition);
-      this.currentWriter = newWriter(currentSpec, currentPartition);
-
-    } else if (partition != currentPartition && partitionComparator.compare(partition, currentPartition) != 0) {
-      closeCurrentWriter();
-      completedPartitions.add(currentPartition);
-
-      if (completedPartitions.contains(partition) && !hasBucketTransform(currentSpec)) {
-        String errorCtx = String.format("partition '%s' in spec %s", spec.partitionToPath(partition), spec);
-        throw new IllegalStateException(NOT_CLUSTERED_ROWS_ERROR_MSG_TEMPLATE + errorCtx);
-      }
-
-      // copy the partition key as the key object may be reused
-      this.currentPartition = StructCopy.copy(partition);
-      this.currentWriter = newWriter(currentSpec, currentPartition);
-    }
-
-    currentWriter.write(row);
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (!closed) {
-      closeCurrentWriter();
-      this.closed = true;
-    }
-  }
-
-  private static boolean hasBucketTransform(PartitionSpec spec) {
-    return spec.fields().stream().anyMatch(f -> BUCKET_TRANSFORM_CLAZZ.isAssignableFrom(f.transform().getClass()));
-  }
-
-  private void closeCurrentWriter() {
-    if (currentWriter != null) {
-      try {
-        currentWriter.close();
-      } catch (IOException e) {
-        throw new UncheckedIOException("Failed to close current writer", e);
-      }
-
-      addResult(currentWriter.result());
-
-      this.currentWriter = null;
-    }
-  }
-
-  @Override
-  public final R result() {
-    Preconditions.checkState(closed, "Cannot get result from unclosed writer");
-    return aggregatedResult();
-  }
-
-  protected EncryptedOutputFile newOutputFile(OutputFileFactory fileFactory, PartitionSpec spec, StructLike partition) {
-    Preconditions.checkArgument(spec.isUnpartitioned() || partition != null,
-        "Partition must not be null when creating output file for partitioned spec");
-    return partition == null ? fileFactory.newOutputFile() : fileFactory.newOutputFile(spec, partition);
-  }
-}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
index 1c77496..92382bd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
@@ -585,6 +585,13 @@ public final class FunctionRegistry {
     system.registerGenericUDF(GenericUDFMaskShowFirstN.UDF_NAME, GenericUDFMaskShowFirstN.class);
     system.registerGenericUDF(GenericUDFMaskShowLastN.UDF_NAME, GenericUDFMaskShowLastN.class);
     system.registerGenericUDF(GenericUDFMaskHash.UDF_NAME, GenericUDFMaskHash.class);
+
+    try {
+      system.registerGenericUDF("iceberg_bucket",
+          (Class<? extends GenericUDF>) Class.forName("org.apache.iceberg.mr.hive.GenericUDFIcebergBucket"));
+    } catch (ClassNotFoundException e) {
+      LOG.warn("iceberg_bucket function could not be registered");
+    }
   }
 
   public static String getNormalizedFunctionName(String fn) throws SemanticException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index 0e38574..4eef621 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.hive.ql.metadata;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import java.net.URI;
 import java.net.URISyntaxException;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.api.LockType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -32,6 +34,7 @@ import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -281,6 +284,21 @@ public interface HiveStorageHandler extends Configurable {
   }
 
   /**
+   * Creates a DynamicPartitionCtx instance that will be set up by the storage handler itself. Useful for non-native
+   * tables where partitions are not handled by Hive, and sorting is required in a custom way before writing the table.
+   * @param conf job conf
+   * @param table the HMS table
+   * @return the created DP context object, null if DP context / sorting is not required
+   * @throws SemanticException
+   */
+  default DynamicPartitionCtx createDPContext(HiveConf conf, org.apache.hadoop.hive.ql.metadata.Table table)
+      throws SemanticException {
+    Preconditions.checkState(alwaysUnpartitioned(), "Should only be called for table formats where partitioning " +
+        "is not handled by Hive but the table format itself. See alwaysUnpartitioned() method.");
+    return null;
+  }
+
+  /**
    * Get file format property key, if the file format is configured through a table property.
    * @return table property key, can be null
    */
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
index 5ec206e..03b4124 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
@@ -23,11 +23,13 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -75,6 +77,7 @@ import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.orc.OrcConf;
@@ -90,10 +93,20 @@ import com.google.common.collect.Sets;
  * sorts the records on partition, bucket and sort columns respectively before inserting records
  * into the destination table. This enables reducers to keep only one record writer all the time
  * thereby reducing the the memory pressure on the reducers.
+ * Sorting is based on the Dynamic Partitioning context that is already created in the file sink operator.
+ * If that contains instructions for custom expression sorting, then this optimizer will disregard any partitioning or
+ * bucketing information of the Hive (table format) table, and will arrange the plan solely as per the custom exprs.
  */
 public class SortedDynPartitionOptimizer extends Transform {
 
-  private static final String BUCKET_NUMBER_COL_NAME = "_bucket_number";
+  private static final Function<List<ExprNodeDesc>, ExprNodeDesc> BUCKET_SORT_EXPRESSION = cols -> {
+    try {
+      return ExprNodeGenericFuncDesc.newInstance(
+          FunctionRegistry.getFunctionInfo("bucket_number").getGenericUDF(), new ArrayList<>());
+    } catch (SemanticException e) {
+      throw new RuntimeException(e);
+    }
+  };
 
   @Override
   public ParseContext transform(ParseContext pCtx) throws SemanticException {
@@ -169,17 +182,27 @@ public class SortedDynPartitionOptimizer extends Transform {
 
       // unlink connection between FS and its parent
       Operator<? extends OperatorDesc> fsParent = fsOp.getParentOperators().get(0);
-      // if all dp columns got constant folded then disable this optimization
-      if (allStaticPartitions(fsParent, fsOp.getConf().getDynPartCtx())) {
+      DynamicPartitionCtx dpCtx = fsOp.getConf().getDynPartCtx();
+
+      ArrayList<ColumnInfo> parentCols = Lists.newArrayList(fsParent.getSchema().getSignature());
+      ArrayList<ExprNodeDesc> allRSCols = Lists.newArrayList();
+      for (ColumnInfo ci : parentCols) {
+        allRSCols.add(new ExprNodeColumnDesc(ci));
+      }
+
+      // if all dp columns / custom sort expressions got constant folded then disable this optimization
+      if (allStaticPartitions(fsParent, allRSCols, dpCtx)) {
         LOG.debug("Bailing out of sorted dynamic partition optimizer as all dynamic partition" +
             " columns got constant folded (static partitioning)");
         return null;
       }
 
-      DynamicPartitionCtx dpCtx = fsOp.getConf().getDynPartCtx();
       List<Integer> partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema());
+      LinkedList<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs =
+          new LinkedList<>(dpCtx.getCustomSortExpressions());
 
-      if (!shouldDo(partitionPositions, fsParent)) {
+      // If custom sort expressions are present, there is an explicit requirement to do sorting
+      if (customSortExprs.isEmpty() && !shouldDo(partitionPositions, fsParent)) {
         return null;
       }
       // if RS is inserted by enforce bucketing or sorting, we need to remove it
@@ -277,15 +300,9 @@ public class SortedDynPartitionOptimizer extends Transform {
       fsOp.getConf().setNumFiles(1);
       fsOp.getConf().setTotalFiles(1);
 
-      ArrayList<ColumnInfo> parentCols = Lists.newArrayList(fsParent.getSchema().getSignature());
-      ArrayList<ExprNodeDesc> allRSCols = Lists.newArrayList();
-      for (ColumnInfo ci : parentCols) {
-        allRSCols.add(new ExprNodeColumnDesc(ci));
-      }
-
       // Create ReduceSink operator
-      ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, sortOrder, sortNullOrder,
-          allRSCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType());
+      ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, customSortExprs, sortOrder,
+          sortNullOrder, allRSCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType());
       // we have to make sure not to reorder the child operators as it might cause weird behavior in the tasks at
       // the same level. when there is auto stats gather at the same level as another operation then it might
       // cause unnecessary preemption. Maintaining the order here to avoid such preemption and possible errors
@@ -319,17 +336,26 @@ public class SortedDynPartitionOptimizer extends Transform {
         }
         descs.add(newColumnExpr);
       }
-
       RowSchema selRS = new RowSchema(fsParent.getSchema());
-      if (bucketColumns!= null && !bucketColumns.isEmpty()) {
-        descs.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
-            ReduceField.KEY.toString()+"."+BUCKET_NUMBER_COL_NAME, null, false));
-        colNames.add(BUCKET_NUMBER_COL_NAME);
-        ColumnInfo ci = new ColumnInfo(BUCKET_NUMBER_COL_NAME, TypeInfoFactory.stringTypeInfo,
-            selRS.getSignature().get(0).getTabAlias(), true, true);
+
+      if (bucketColumns != null && !bucketColumns.isEmpty()) {
+        customSortExprs.add(BUCKET_SORT_EXPRESSION);
+      }
+
+      for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr : customSortExprs) {
+        ExprNodeDesc colExpr = customSortExpr.apply(allRSCols);
+        String customSortColName = colExpr.getExprString();
+        TypeInfo customSortColTypeInfo = colExpr.getTypeInfo();
+
+        descs.add(new ExprNodeColumnDesc(customSortColTypeInfo, ReduceField.KEY + "." + customSortColName,
+            null, false));
+        colNames.add(customSortColName);
+        ColumnInfo ci = new ColumnInfo(
+            customSortColName, customSortColTypeInfo, selRS.getSignature().get(0).getTabAlias(), true, true);
         selRS.getSignature().add(ci);
         rsOp.getSchema().getSignature().add(ci);
       }
+
       // Create SelectDesc
       SelectDesc selConf = new SelectDesc(descs, colNames);
 
@@ -360,30 +386,54 @@ public class SortedDynPartitionOptimizer extends Transform {
       return null;
     }
 
-    private boolean allStaticPartitions(Operator<? extends OperatorDesc> op,
+    private boolean allStaticPartitions(Operator<? extends OperatorDesc> op, List<ExprNodeDesc> allRSCols,
         final DynamicPartitionCtx dynPartCtx) {
-      int numDpCols = dynPartCtx.getNumDPCols();
-      int numCols = op.getSchema().getColumnNames().size();
-      List<String> dpCols = op.getSchema().getColumnNames().subList(numCols - numDpCols, numCols);
+
       if (op.getColumnExprMap() == null) {
         // find first operator upstream with valid (non-null) column expression map
-        for(Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
+        for (Operator<? extends OperatorDesc> parent : op.getParentOperators()) {
           if (parent.getColumnExprMap() != null) {
             op = parent;
             break;
           }
         }
       }
-      if (op.getColumnExprMap() != null) {
-        for(String dpCol : dpCols) {
-          ExprNodeDesc end = ExprNodeDescUtils.findConstantExprOrigin(dpCol, op);
-          if (!(end instanceof ExprNodeConstantDesc)) {
-            return false;
-          }
+      // No mappings for any columns
+      if (op.getColumnExprMap() == null) {
+        return false;
+      }
+
+      List<String> referencedSortColumnNames = new LinkedList<>();
+      List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs = dynPartCtx.getCustomSortExpressions();
+
+      if (customSortExprs != null && !customSortExprs.isEmpty()) {
+        Set<ExprNodeColumnDesc> columnDescs = new HashSet<>();
+
+        // Find relevant column descs (e.g. _col0, _col2) for each sort expression
+        for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr : customSortExprs) {
+          ExprNodeDesc sortExpressionForRSSchema = customSortExpr.apply(allRSCols);
+          columnDescs.addAll(ExprNodeDescUtils.findAllColumnDescs(sortExpressionForRSSchema));
+        }
+
+        for (ExprNodeColumnDesc columnDesc : columnDescs) {
+          referencedSortColumnNames.add(columnDesc.getColumn());
         }
+
       } else {
-        return false;
+        int numDpCols = dynPartCtx.getNumDPCols();
+        int numCols = op.getSchema().getColumnNames().size();
+        referencedSortColumnNames.addAll(op.getSchema().getColumnNames().subList(numCols - numDpCols, numCols));
+      }
+
+      for(String dpCol : referencedSortColumnNames) {
+        ExprNodeDesc end = ExprNodeDescUtils.findConstantExprOrigin(dpCol, op);
+        if (!(end instanceof ExprNodeConstantDesc)) {
+          // There is at least 1 column with no constant mapping -> we will need to do the sorting
+          return false;
+        }
       }
+
+      // All columns had constant mappings
       return true;
     }
 
@@ -521,20 +571,30 @@ public class SortedDynPartitionOptimizer extends Transform {
       }
     }
 
-    public ReduceSinkOperator getReduceSinkOp(List<Integer> partitionPositions,
-        List<Integer> sortPositions, List<Integer> sortOrder, List<Integer> sortNullOrder,
-        ArrayList<ExprNodeDesc> allCols, ArrayList<ExprNodeDesc> bucketColumns, int numBuckets,
-        Operator<? extends OperatorDesc> parent, AcidUtils.Operation writeType) throws SemanticException {
+    public ReduceSinkOperator getReduceSinkOp(List<Integer> partitionPositions, List<Integer> sortPositions,
+        List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs, List<Integer> sortOrder,
+        List<Integer> sortNullOrder, ArrayList<ExprNodeDesc> allCols, ArrayList<ExprNodeDesc> bucketColumns,
+        int numBuckets, Operator<? extends OperatorDesc> parent, AcidUtils.Operation writeType) {
+
+      // Order of KEY columns, if custom sort is present partition and bucket columns are disregarded:
+      // 0) Custom sort expressions
+      //                              1) Partition columns
+      //                              2) Bucket number column
+      //                 3) Sort columns
+
+      boolean customSortExprPresent = customSortExprs != null && !customSortExprs.isEmpty();
 
-      // Order of KEY columns
-      // 1) Partition columns
-      // 2) Bucket number column
-      // 3) Sort columns
       Set<Integer> keyColsPosInVal = Sets.newLinkedHashSet();
       ArrayList<ExprNodeDesc> keyCols = Lists.newArrayList();
       List<Integer> newSortOrder = Lists.newArrayList();
       List<Integer> newSortNullOrder = Lists.newArrayList();
 
+      if (customSortExprPresent) {
+        partitionPositions = new ArrayList<>();
+        bucketColumns = new ArrayList<>();
+        numBuckets = -1;
+      }
+
       keyColsPosInVal.addAll(partitionPositions);
       if (bucketColumns != null && !bucketColumns.isEmpty()) {
         keyColsPosInVal.add(-1);
@@ -548,7 +608,7 @@ public class SortedDynPartitionOptimizer extends Transform {
           order = 0;
         }
       }
-      for (int i = 0; i < keyColsPosInVal.size(); i++) {
+      for (int i = 0; i < keyColsPosInVal.size() + customSortExprs.size(); i++) {
         newSortOrder.add(order);
       }
 
@@ -571,7 +631,7 @@ public class SortedDynPartitionOptimizer extends Transform {
           nullOrder = 1;
         }
       }
-      for (int i = 0; i < keyColsPosInVal.size(); i++) {
+      for (int i = 0; i < keyColsPosInVal.size() + customSortExprs.size(); i++) {
         newSortNullOrder.add(nullOrder);
       }
 
@@ -587,15 +647,15 @@ public class SortedDynPartitionOptimizer extends Transform {
       Map<String, ExprNodeDesc> colExprMap = Maps.newHashMap();
       ArrayList<ExprNodeDesc> partCols = Lists.newArrayList();
 
+      for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr : customSortExprs) {
+        keyCols.add(customSortExpr.apply(allCols));
+      }
+
       // we will clone here as RS will update bucket column key with its
       // corresponding with bucket number and hence their OIs
       for (Integer idx : keyColsPosInVal) {
-        if (idx < 0) {
-          ExprNodeDesc bucketNumColUDF = ExprNodeGenericFuncDesc.newInstance(
-              FunctionRegistry.getFunctionInfo("bucket_number").getGenericUDF(), new ArrayList<>());
-          keyCols.add(bucketNumColUDF);
-          colExprMap.put(Utilities.ReduceField.KEY + "." +BUCKET_NUMBER_COL_NAME, bucketNumColUDF);
-
+        if (idx == -1) {
+          keyCols.add(BUCKET_SORT_EXPRESSION.apply(allCols));
         } else {
           keyCols.add(allCols.get(idx).clone());
         }
@@ -674,7 +734,10 @@ public class SortedDynPartitionOptimizer extends Transform {
       }
       ReduceSinkOperator op = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
           rsConf, new RowSchema(signature), parent);
-      rsConf.addComputedField(Utilities.ReduceField.KEY + "." + BUCKET_NUMBER_COL_NAME);
+      rsConf.addComputedField(Utilities.ReduceField.KEY + "." + BUCKET_SORT_EXPRESSION.apply(allCols).getExprString());
+      for (Function<List<ExprNodeDesc>, ExprNodeDesc> customSortExpr : customSortExprs) {
+        rsConf.addComputedField(Utilities.ReduceField.KEY + "." + customSortExpr.apply(allCols).getExprString());
+      }
       op.setColumnExprMap(colExprMap);
       return op;
     }
@@ -738,7 +801,7 @@ public class SortedDynPartitionOptimizer extends Transform {
 
     // the idea is to estimate how many number of writers this insert can spun up.
     // Writers are proportional to number of partitions being inserted i.e cardinality of the partition columns
-    //  if these writers are less than number of writers allowed within the memory pool (estimated) we go ahead with
+    //  if these writers are more than number of writers allowed within the memory pool (estimated) we go ahead with
     //  adding extra RS
     //  The way max number of writers allowed are computed based on
     //  (executor/container memory) * (percentage of memory taken by orc)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index be15bd4..d179a94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -8129,13 +8129,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       // Some non-native tables might be partitioned without partition spec information being present in the Table object
       HiveStorageHandler storageHandler = dest_tab.getStorageHandler();
       if (storageHandler != null && storageHandler.alwaysUnpartitioned()) {
-        List<PartitionTransformSpec> nonNativePartSpecs = storageHandler.getPartitionTransformSpec(dest_tab);
-        if (dpCtx == null && nonNativePartSpecs != null && !nonNativePartSpecs.isEmpty()) {
-          verifyDynamicPartitionEnabled(conf, qb, dest);
-          Map<String, String> partSpec = new LinkedHashMap<>();
-          nonNativePartSpecs.forEach(ps -> partSpec.put(ps.getColumnName(), null));
-          dpCtx = new DynamicPartitionCtx(partSpec, conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
-              conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE));
+        DynamicPartitionCtx nonNativeDpCtx = storageHandler.createDPContext(conf, dest_tab);
+        if (dpCtx == null && nonNativeDpCtx != null) {
+          dpCtx = nonNativeDpCtx;
         }
       }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
index c1aeb8f..4acc540 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hive.ql.plan;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Function;
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.Path;
@@ -50,6 +52,14 @@ public class DynamicPartitionCtx implements Serializable {
   private String defaultPartName; // default partition name in case of null or empty value
   private int maxPartsPerNode;    // maximum dynamic partitions created per mapper/reducer
   private Pattern whiteListPattern;
+  /**
+   * Expressions describing a custom way of sorting the table before write. Expressions can reference simple
+   * column descriptions or a tree of expressions containing more columns and UDFs.
+   * Can be useful for custom bucket/hash sorting.
+   * A custom expression should be a lambda that is given the original column description expressions as per read
+   * schema and returns a single expression. Example for simply just referencing column 3: cols -> cols.get(3).clone()
+   */
+  private transient List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExpressions;
 
   public DynamicPartitionCtx() {
   }
@@ -82,6 +92,7 @@ public class DynamicPartitionCtx implements Serializable {
       throw new SemanticException(e);
     }
     this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal);
+    this.customSortExpressions = new LinkedList<>();
   }
 
   public DynamicPartitionCtx(Map<String, String> partSpec, String defaultPartName,
@@ -114,6 +125,7 @@ public class DynamicPartitionCtx implements Serializable {
       throw new SemanticException(e);
     }
     this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal);
+    this.customSortExpressions = new LinkedList<>();
   }
 
   public DynamicPartitionCtx(DynamicPartitionCtx dp) {
@@ -128,6 +140,7 @@ public class DynamicPartitionCtx implements Serializable {
     this.defaultPartName = dp.defaultPartName;
     this.maxPartsPerNode = dp.maxPartsPerNode;
     this.whiteListPattern = dp.whiteListPattern;
+    this.customSortExpressions = dp.customSortExpressions;
   }
 
   public Pattern getWhiteListPattern() {
@@ -213,4 +226,12 @@ public class DynamicPartitionCtx implements Serializable {
   public String getSPPath() {
     return this.spPath;
   }
+
+  public List<Function<List<ExprNodeDesc>, ExprNodeDesc>> getCustomSortExpressions() {
+    return customSortExpressions;
+  }
+
+  public void setCustomSortExpressions(List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExpressions) {
+    this.customSortExpressions = customSortExpressions;
+  }
 }