You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2022/07/21 11:23:56 UTC

[hive] branch master updated: HIVE-26397: Honour Iceberg sort orders when writing a table. (#3445) (Laszlo Pinter, reviewed by Adam Szita and Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new dd424d77230 HIVE-26397: Honour Iceberg sort orders when writing a table. (#3445) (Laszlo Pinter, reviewed by Adam Szita and Peter Vary)
dd424d77230 is described below

commit dd424d77230f275b78d4465548cad5b57aca66e9
Author: László Pintér <47...@users.noreply.github.com>
AuthorDate: Thu Jul 21 13:23:43 2022 +0200

    HIVE-26397: Honour Iceberg sort orders when writing a table. (#3445) (Laszlo Pinter, reviewed by Adam Szita and Peter Vary)
---
 .../iceberg/mr/hive/HiveIcebergMetaHook.java       |   4 +-
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java | 116 +++++++++++++++------
 .../apache/iceberg/mr/hive/IcebergTableUtil.java   |  12 +--
 .../java/org/apache/iceberg/mr/TestHelper.java     |  19 +++-
 .../iceberg/mr/hive/HiveIcebergTestUtils.java      |  13 ++-
 .../iceberg/mr/hive/TestHiveIcebergInserts.java    |  86 +++++++++++++++
 .../org/apache/iceberg/mr/hive/TestTables.java     |  49 ++++++++-
 .../hive/ql/ddl/table/create/CreateTableDesc.java  |   6 +-
 .../desc/formatter/JsonDescTableFormatter.java     |   4 +-
 .../desc/formatter/TextDescTableFormatter.java     |  15 ++-
 .../set/AlterTableSetPartitionSpecAnalyzer.java    |   4 +-
 .../apache/hadoop/hive/ql/exec/DDLPlanUtils.java   |   8 +-
 .../hive/ql/metadata/HiveStorageHandler.java       |  23 +++-
 .../ql/optimizer/SortedDynPartitionOptimizer.java  |  38 +++++--
 .../hadoop/hive/ql/parse/PartitionTransform.java   |  13 ++-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     |   2 +-
 ...titionTransformSpec.java => TransformSpec.java} |   6 +-
 .../hadoop/hive/ql/plan/DynamicPartitionCtx.java   |  24 +++++
 18 files changed, 362 insertions(+), 80 deletions(-)

diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
index c8f652d577a..1191ca8e431 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergMetaHook.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.PartitionTransform;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -296,7 +296,7 @@ public class HiveIcebergMetaHook implements HiveMetaHook {
         if (hmsTable.isSetPartitionKeys() && !hmsTable.getPartitionKeys().isEmpty()) {
           db.dropPartitions(hmsTable.getDbName(), hmsTable.getTableName(), EMPTY_FILTER, DROP_OPTIONS);
 
-          List<PartitionTransformSpec> spec = PartitionTransform.getPartitionTransformSpec(hmsTable.getPartitionKeys());
+          List<TransformSpec> spec = PartitionTransform.getPartitionTransformSpec(hmsTable.getPartitionKeys());
           if (!SessionStateUtil.addResource(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC, spec)) {
             throw new MetaException("Query state attached to Session state must be not null. " +
                 "Partition transform metadata cannot be saved.");
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 25881408a63..51e3d60ad9d 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -57,8 +57,8 @@ import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.metadata.HiveStoragePredicateHandler;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -84,11 +84,15 @@ import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.NullOrder;
 import org.apache.iceberg.PartitionSpecParser;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.SerializableTable;
 import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.hadoop.HadoopConfigurable;
@@ -332,27 +336,35 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
   }
 
   @Override
-  public List<PartitionTransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+  public List<TransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
     TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
     Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
-    return table.spec().fields().stream().map(f -> {
-      PartitionTransformSpec spec = new PartitionTransformSpec();
-      spec.setColumnName(table.schema().findColumnName(f.sourceId()));
-      // right now the only way to fetch the transform type and its params is through the toString() call
-      String transformName = f.transform().toString().toUpperCase();
-      // if the transform name contains '[' it means it has some config params
-      if (transformName.contains("[")) {
-        spec.setTransformType(PartitionTransformSpec.TransformType
-            .valueOf(transformName.substring(0, transformName.indexOf("["))));
-        spec.setTransformParam(Optional.of(Integer
-            .valueOf(transformName.substring(transformName.indexOf("[") + 1, transformName.indexOf("]")))));
-      } else {
-        spec.setTransformType(PartitionTransformSpec.TransformType.valueOf(transformName));
-        spec.setTransformParam(Optional.empty());
-      }
+    return table.spec().fields().stream().map(f ->
+      getTransformSpec(table, f.transform().toString().toUpperCase(), f.sourceId())
+    ).collect(Collectors.toList());
+  }
+
+  private List<TransformSpec> getSortTransformSpec(Table table) {
+    return table.sortOrder().fields().stream().map(s ->
+      getTransformSpec(table, s.transform().toString().toUpperCase(), s.sourceId())
+    ).collect(Collectors.toList());
+  }
+
+  private TransformSpec getTransformSpec(Table table, String transformName, int sourceId) {
+    TransformSpec spec = new TransformSpec();
+    spec.setColumnName(table.schema().findColumnName(sourceId));
+    // if the transform name contains '[' it means it has some config params
+    if (transformName.contains("[")) {
+      spec.setTransformType(TransformSpec.TransformType
+          .valueOf(transformName.substring(0, transformName.indexOf("["))));
+      spec.setTransformParam(Optional.of(Integer
+          .valueOf(transformName.substring(transformName.indexOf("[") + 1, transformName.indexOf("]")))));
+    } else {
+      spec.setTransformType(TransformSpec.TransformType.valueOf(transformName));
+      spec.setTransformParam(Optional.empty());
+    }
 
-      return spec;
-    }).collect(Collectors.toList());
+    return spec;
   }
 
   @Override
@@ -367,12 +379,6 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
 
     TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
     Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
-    if (table.spec().isUnpartitioned()) {
-      return null;
-    }
-
-    // Iceberg currently doesn't have publicly accessible partition transform information, hence use above string parse
-    List<PartitionTransformSpec> partitionTransformSpecs = getPartitionTransformSpec(hmsTable);
 
     DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(Maps.newLinkedHashMap(),
         hiveConf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME),
@@ -380,6 +386,39 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
     List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs = Lists.newLinkedList();
     dpCtx.setCustomSortExpressions(customSortExprs);
 
+    if (table.spec().isPartitioned()) {
+      addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs, getPartitionTransformSpec(hmsTable));
+    }
+
+    SortOrder sortOrder = table.sortOrder();
+    if (sortOrder.isSorted()) {
+      List<Integer> customSortPositions = Lists.newLinkedList();
+      List<Integer> customSortOrder = Lists.newLinkedList();
+      dpCtx.setCustomSortOrder(customSortOrder);
+      List<Integer> customSortNullOrder = Lists.newLinkedList();
+      dpCtx.setCustomSortNullOrder(customSortNullOrder);
+      for (SortField sortField : sortOrder.fields()) {
+        int pos = 0;
+        for (Types.NestedField field : table.schema().columns()) {
+          if (sortField.sourceId() == field.fieldId()) {
+            customSortPositions.add(pos);
+            customSortOrder.add(sortField.direction() == SortDirection.ASC ? 1 : 0);
+            customSortNullOrder.add(sortField.nullOrder() == NullOrder.NULLS_FIRST ? 0 : 1);
+            break;
+          }
+          pos++;
+        }
+      }
+
+      addCustomSortExpr(table, hmsTable, writeOperation, customSortExprs, getSortTransformSpec(table));
+    }
+
+    return dpCtx;
+  }
+
+  private void addCustomSortExpr(Table table,  org.apache.hadoop.hive.ql.metadata.Table hmsTable,
+      Operation writeOperation, List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs,
+      List<TransformSpec> transformSpecs) {
     Map<String, Integer> fieldOrderMap = Maps.newHashMap();
     List<Types.NestedField> fields = table.schema().columns();
     for (int i = 0; i < fields.size(); ++i) {
@@ -387,16 +426,15 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
     }
 
     int offset = acidSelectColumns(hmsTable, writeOperation).size();
-    for (PartitionTransformSpec spec : partitionTransformSpecs) {
+
+    for (TransformSpec spec : transformSpecs) {
       int order = fieldOrderMap.get(spec.getColumnName());
-      if (PartitionTransformSpec.TransformType.BUCKET.equals(spec.getTransformType())) {
+      if (TransformSpec.TransformType.BUCKET.equals(spec.getTransformType())) {
         customSortExprs.add(BUCKET_SORT_EXPR.apply(order + offset, spec.getTransformParam().get()));
       } else {
         customSortExprs.add(cols -> cols.get(order + offset).clone());
       }
     }
-
-    return dpCtx;
   }
 
   @Override
@@ -570,6 +608,26 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
     }
   }
 
+  @Override
+  public boolean supportsSortColumns() {
+    return true;
+  }
+
+  @Override
+  public List<FieldSchema> sortColumns(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
+    TableDesc tableDesc = Utilities.getTableDesc(hmsTable);
+    Table table = IcebergTableUtil.getTable(conf, tableDesc.getProperties());
+    if (table.sortOrder().isUnsorted()) {
+      return Collections.emptyList();
+    }
+
+    Schema schema = table.schema();
+    return table.sortOrder().fields().stream().map(s -> new FieldSchema(schema.findColumnName(s.sourceId()),
+        schema.findType(s.sourceId()).toString(),
+        String.format("Transform: %s, Sort direction: %s, Null sort order: %s",
+        s.transform().toString(), s.direction().name(), s.nullOrder().name()))).collect(Collectors.toList());
+  }
+
   private void setCommonJobConf(JobConf jobConf) {
     jobConf.set("tez.mrreader.config.update.properties", "hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids");
   }
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 3fe2eee39df..344834ec62e 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
 import org.apache.iceberg.ManageSnapshots;
 import org.apache.iceberg.PartitionSpec;
@@ -98,15 +98,15 @@ public class IcebergTableUtil {
 
   /**
    * Create {@link PartitionSpec} based on the partition information stored in
-   * {@link PartitionTransformSpec}.
+   * {@link TransformSpec}.
    * @param configuration a Hadoop configuration
    * @param schema iceberg table schema
    * @return iceberg partition spec, always non-null
    */
   public static PartitionSpec spec(Configuration configuration, Schema schema) {
-    List<PartitionTransformSpec> partitionTransformSpecList = SessionStateUtil
+    List<TransformSpec> partitionTransformSpecList = SessionStateUtil
             .getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC)
-        .map(o -> (List<PartitionTransformSpec>) o).orElseGet(() -> null);
+        .map(o -> (List<TransformSpec>) o).orElseGet(() -> null);
 
     if (partitionTransformSpecList == null) {
       LOG.debug("Iceberg partition transform spec is not found in QueryState.");
@@ -154,9 +154,9 @@ public class IcebergTableUtil {
     UpdatePartitionSpec updatePartitionSpec = table.updateSpec().caseSensitive(false);
     table.spec().fields().forEach(field -> updatePartitionSpec.removeField(field.name()));
 
-    List<PartitionTransformSpec> partitionTransformSpecList = SessionStateUtil
+    List<TransformSpec> partitionTransformSpecList = SessionStateUtil
         .getResource(configuration, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC)
-        .map(o -> (List<PartitionTransformSpec>) o).orElseGet(() -> null);
+        .map(o -> (List<TransformSpec>) o).orElseGet(() -> null);
 
     partitionTransformSpecList.forEach(spec -> {
       switch (spec.getTransformType()) {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
index df61cfd8c37..5614281a731 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/TestHelper.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
@@ -54,6 +55,7 @@ public class TestHelper {
   private final FileFormat fileFormat;
   private final TemporaryFolder tmp;
   private final Map<String, String> tblProps;
+  private SortOrder order;
 
   private Table table;
 
@@ -79,6 +81,10 @@ public class TestHelper {
     conf.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(table.schema()));
   }
 
+  public void setOrder(SortOrder order) {
+    this.order = order;
+  }
+
   public Table table() {
     return table;
   }
@@ -91,13 +97,22 @@ public class TestHelper {
   }
 
   public Table createTable(Schema theSchema, PartitionSpec theSpec) {
-    Table tbl = tables.create(theSchema, theSpec, properties(), tableIdentifier);
+    return createTable(theSchema, theSpec, null);
+  }
+
+  public Table createTable(Schema theSchema, PartitionSpec theSpec, SortOrder theOrder) {
+    Table tbl;
+    if (theOrder != null) {
+      tbl = tables.create(theSchema, theSpec, theOrder, properties(), tableIdentifier);
+    } else {
+      tbl = tables.create(theSchema, theSpec, properties(), tableIdentifier);
+    }
     setTable(tbl);
     return tbl;
   }
 
   public Table createTable() {
-    return createTable(schema, spec);
+    return createTable(schema, spec, order);
   }
 
   public Table createUnpartitionedTable() {
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
index 5e217acc82a..dc68ce9a80b 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/HiveIcebergTestUtils.java
@@ -261,8 +261,17 @@ public class HiveIcebergTestUtils {
     sortedActual.sort(Comparator.comparingInt(record -> record.get(sortBy).hashCode()));
 
     Assert.assertEquals(sortedExpected.size(), sortedActual.size());
-    for (int i = 0; i < sortedExpected.size(); ++i) {
-      assertEquals(sortedExpected.get(i), sortedActual.get(i));
+    validateData(sortedExpected, sortedActual);
+  }
+
+  /**
+   * Validates whether the 2 sets of records are the same.
+   * @param expected The expected list of Records
+   * @param actual The actual list of Records
+   */
+  public static void validateData(List<Record> expected, List<Record> actual) {
+    for (int i = 0; i < expected.size(); ++i) {
+      assertEquals(expected.get(i), actual.get(i));
     }
   }
 
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
index 31a589a7c96..0c15ba4e430 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInserts.java
@@ -25,6 +25,7 @@ import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.Record;
@@ -38,6 +39,10 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 
+import static org.apache.iceberg.NullOrder.NULLS_FIRST;
+import static org.apache.iceberg.NullOrder.NULLS_LAST;
+import static org.apache.iceberg.expressions.Expressions.bucket;
+import static org.apache.iceberg.expressions.Expressions.truncate;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.apache.iceberg.types.Types.NestedField.required;
 
@@ -47,6 +52,87 @@ import static org.apache.iceberg.types.Types.NestedField.required;
  */
 public class TestHiveIcebergInserts extends HiveIcebergStorageHandlerWithEngineBase {
 
+  @Test
+  public void testSortedInsert() throws IOException {
+    TableIdentifier identifier = TableIdentifier.of("default", "sort_table");
+
+    Schema schema = new Schema(
+        optional(1, "id", Types.IntegerType.get(), "unique ID"),
+        optional(2, "data", Types.StringType.get())
+    );
+    SortOrder order = SortOrder.builderFor(schema)
+        .asc("id", NULLS_FIRST)
+        .desc("data", NULLS_LAST)
+        .build();
+
+    testTables.createTable(shell, identifier.name(), schema, order, PartitionSpec.unpartitioned(), fileFormat,
+        ImmutableList.of(), 1, ImmutableMap.of());
+    shell.executeStatement(String.format("INSERT INTO TABLE %s VALUES (4, 'a'), (1, 'a'), (3, 'a'), (2, 'a'), " +
+            "(null, 'a'), (3, 'b'), (3, null)", identifier.name()));
+
+    List<Record> expected = TestHelper.RecordsBuilder.newInstance(schema)
+        .add(null, "a").add(1, "a").add(2, "a").add(3, "b").add(3, "a").add(3, null).add(4, "a")
+        .build();
+    List<Object[]> result = shell.executeStatement(String.format("SELECT * FROM %s", identifier.name()));
+    HiveIcebergTestUtils.validateData(expected, HiveIcebergTestUtils.valueForRow(schema, result));
+  }
+
+  @Test
+  public void testSortedAndTransformedInsert() throws IOException {
+    TableIdentifier identifier = TableIdentifier.of("default", "sort_table");
+
+    SortOrder order = SortOrder.builderFor(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .asc(bucket("customer_id", 2), NULLS_FIRST)
+        .desc(truncate("first_name", 4), NULLS_LAST)
+        .asc("last_name", NULLS_LAST)
+        .build();
+
+    testTables.createTable(shell, identifier.name(), HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, order,
+        PartitionSpec.unpartitioned(), fileFormat, ImmutableList.of(), 1, ImmutableMap.of());
+
+    StringBuilder insertQuery = new StringBuilder().append(String.format("INSERT INTO %s VALUES ", identifier.name()));
+    HiveIcebergStorageHandlerTestUtils.OTHER_CUSTOMER_RECORDS_2.forEach(record -> insertQuery.append("(")
+        .append(record.get(0)).append(",'")
+        .append(record.get(1)).append("','")
+        .append(record.get(2)).append("'),"));
+    insertQuery.setLength(insertQuery.length() - 1);
+
+    shell.executeStatement(insertQuery.toString());
+    List<Record> expected = TestHelper.RecordsBuilder.newInstance(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA)
+        .add(2L, "Susan", "Morrison").add(1L, "Sharon", "Taylor").add(1L, "Joanna", "Pierce")
+        .add(2L, "Joanna", "Silver").add(2L, "Jake", "Donnel").add(2L, "Bob", "Silver").add(3L, "Trudy", "Henderson")
+        .add(3L, "Trudy", "Johnson").add(3L, "Blake", "Burr").build();
+    List<Object[]> result = shell.executeStatement(String.format("SELECT * FROM %s", identifier.name()));
+    HiveIcebergTestUtils.validateData(expected,
+        HiveIcebergTestUtils.valueForRow(HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA, result));
+  }
+
+  @Test
+  public void testSortedAndTransformedInsertIntoPartitionedTable() throws IOException {
+    TableIdentifier identifier = TableIdentifier.of("default", "tbl_bucketed");
+    Schema schema = new Schema(
+        optional(1, "a", Types.IntegerType.get()),
+        optional(2, "b", Types.StringType.get()),
+        optional(3, "c", Types.IntegerType.get())
+    );
+    SortOrder order = SortOrder.builderFor(schema)
+        .desc("c", NULLS_FIRST)
+        .asc(truncate("b", 1))
+        .build();
+    PartitionSpec partitionSpec = PartitionSpec.builderFor(schema)
+        .bucket("b", 2)
+        .build();
+    testTables.createTable(shell, identifier.name(), schema, order, partitionSpec, fileFormat, ImmutableList.of(), 1,
+        ImmutableMap.of());
+    shell.executeStatement(String.format("INSERT INTO %s VALUES (1, 'EUR', 10), (5, 'HUF', 30), (2, 'EUR', 10), " +
+        "(8, 'PLN', 20), (6, 'USD', null)", identifier.name()));
+    List<Object[]> result = shell.executeStatement(String.format("SELECT * FROM %s", identifier.name()));
+    List<Record> expected =
+        TestHelper.RecordsBuilder.newInstance(schema).add(1, "EUR", 10).add(2, "EUR", 10).add(6, "USD", null)
+            .add(5, "HUF", 30).add(8, "PLN", 20).build();
+    HiveIcebergTestUtils.validateData(expected, HiveIcebergTestUtils.valueForRow(schema, result));
+  }
+
   @Test
   public void testInsert() throws IOException {
     Table table = testTables.createTable(shell, "customers", HiveIcebergStorageHandlerTestUtils.CUSTOMER_SCHEMA,
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
index f2b58d776c5..efaea9a8a01 100644
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
+++ b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/TestTables.java
@@ -251,9 +251,33 @@ abstract class TestTables {
    */
   public Table createTable(TestHiveShell shell, String tableName, Schema schema, FileFormat fileFormat,
       List<Record> records, int formatVersion, Map<String, String> tblProperties) throws IOException {
+    return createTable(shell, tableName, schema, SortOrder.unsorted(), PartitionSpec.unpartitioned(), fileFormat,
+        records, formatVersion, tblProperties);
+  }
+
+  /**
+   * Creates a non partitioned Hive test table. Creates the Iceberg table/data and creates the corresponding Hive
+   * table as well when needed. The table will be in the 'default' database. The table will be populated with the
+   * provided List of {@link Record}s.
+   * @param shell The HiveShell used for Hive table creation
+   * @param tableName The name of the test table
+   * @param schema The schema used for the table creation
+   * @param order The sort order used for the table creation
+   * @param partSpec The partition spec used for the table creation
+   * @param fileFormat The file format used for writing the data
+   * @param records The records with which the table is populated
+   * @param formatVersion The version of the spec the table should use (format-version)
+   * @param tblProperties Additional table properties
+   * @return The created table
+   * @throws IOException If there is an error writing data
+   */
+  public Table createTable(TestHiveShell shell, String tableName, Schema schema, SortOrder order,
+      PartitionSpec partSpec, FileFormat fileFormat, List<Record> records, int formatVersion,
+      Map<String, String> tblProperties) throws IOException {
     ImmutableMap<String, String> tblProps = ImmutableMap.<String, String>builder().putAll(tblProperties)
         .put(TableProperties.FORMAT_VERSION, Integer.toString(formatVersion)).build();
-    Table table = createIcebergTable(shell.getHiveConf(), tableName, schema, fileFormat, tblProps, records);
+    Table table = createIcebergTable(shell.getHiveConf(), tableName, schema, order, partSpec, fileFormat, tblProps,
+        records);
     String createHiveSQL = createHiveTableSQL(TableIdentifier.of("default", tableName), tblProps);
     if (createHiveSQL != null) {
       shell.executeStatement(createHiveSQL);
@@ -386,9 +410,30 @@ abstract class TestTables {
    */
   public Table createIcebergTable(Configuration configuration, String tableName, Schema schema, FileFormat fileFormat,
       Map<String, String> additionalTableProps, List<Record> records) throws IOException {
+    return createIcebergTable(configuration, tableName, schema, SortOrder.unsorted(), PartitionSpec.unpartitioned(),
+        fileFormat, additionalTableProps, records);
+  }
+
+  /**
+   * Creates an Iceberg table/data without creating the corresponding Hive table. The table will be in the 'default'
+   * namespace.
+   * @param configuration The configuration used during the table creation
+   * @param tableName The name of the test table
+   * @param schema The schema used for the table creation
+   * @param order The sort order used for the table creation
+   * @param partSpec The partition spec used for the table creation
+   * @param fileFormat The file format used for writing the data
+   * @param records The records with which the table is populated
+   * @return The create table
+   * @throws IOException If there is an error writing data
+   */
+  public Table createIcebergTable(Configuration configuration, String tableName, Schema schema, SortOrder order,
+      PartitionSpec partSpec, FileFormat fileFormat, Map<String, String> additionalTableProps, List<Record> records)
+      throws IOException {
     String identifier = identifier("default." + tableName);
     TestHelper helper = new TestHelper(new Configuration(configuration), tables(), identifier, schema,
-        PartitionSpec.unpartitioned(), fileFormat, additionalTableProps, temp);
+        partSpec, fileFormat, additionalTableProps, temp);
+    helper.setOrder(order);
     Table table = helper.createTable();
 
     if (records != null && !records.isEmpty()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
index b484428cc07..eeac3a6d339 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/create/CreateTableDesc.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.PartitionManagementTask;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
@@ -45,7 +44,6 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -59,7 +57,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ParseUtils;
 import org.apache.hadoop.hive.ql.parse.PartitionTransform;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.Explain;
@@ -833,7 +831,7 @@ public class CreateTableDesc implements DDLDesc, Serializable {
       if (partCols.isPresent() && !partCols.get().isEmpty()) {
         // Add the partition columns to the normal columns and save the transform to the session state
         tbl.getSd().getCols().addAll(partCols.get());
-        List<PartitionTransformSpec> spec = PartitionTransform.getPartitionTransformSpec(partCols.get());
+        List<TransformSpec> spec = PartitionTransform.getPartitionTransformSpec(partCols.get());
         if (!SessionStateUtil.addResource(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC, spec)) {
           throw new HiveException("Query state attached to Session state must be not null. " +
                                       "Partition transform metadata cannot be saved.");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/JsonDescTableFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/JsonDescTableFormatter.java
index 1444eb23263..d115935b4ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/JsonDescTableFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/JsonDescTableFormatter.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.UniqueConstraint;
 import org.apache.hadoop.hive.ql.metadata.formatting.MapBuilder;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
 
 import java.io.DataOutputStream;
 import java.util.ArrayList;
@@ -245,7 +245,7 @@ public class JsonDescTableFormatter extends DescTableFormatter {
     }
     if (table.isNonNative() && table.getStorageHandler() != null &&
         table.getStorageHandler().supportsPartitionTransform()) {
-      List<PartitionTransformSpec> specs = table.getStorageHandler().getPartitionTransformSpec(table);
+      List<TransformSpec> specs = table.getStorageHandler().getPartitionTransformSpec(table);
       if (!specs.isEmpty()) {
         builder.put("partitionSpecInfo", specs.stream().map(s -> {
           Map<String, String> result = new LinkedHashMap<>();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java
index 53273b9810c..5583337ce20 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/info/desc/formatter/TextDescTableFormatter.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.UniqueConstraint;
 import org.apache.hadoop.hive.ql.metadata.ForeignKeyInfo.ForeignKeyCol;
 import org.apache.hadoop.hive.ql.metadata.UniqueConstraint.UniqueConstraintCol;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hive.common.util.HiveStringUtils;
@@ -110,12 +110,12 @@ class TextDescTableFormatter extends DescTableFormatter {
     if (table.isNonNative() && table.getStorageHandler() != null &&
         table.getStorageHandler().supportsPartitionTransform()) {
 
-      List<PartitionTransformSpec> partSpecs = table.getStorageHandler().getPartitionTransformSpec(table);
+      List<TransformSpec> partSpecs = table.getStorageHandler().getPartitionTransformSpec(table);
       if (partSpecs != null && !partSpecs.isEmpty()) {
         TextMetaDataTable metaDataTable = new TextMetaDataTable();
         partitionTransformOutput += LINE_DELIM + "# Partition Transform Information" + LINE_DELIM + "# ";
         metaDataTable.addRow(DescTableDesc.PARTITION_TRANSFORM_SPEC_SCHEMA.split("#")[0].split(","));
-        for (PartitionTransformSpec spec : partSpecs) {
+        for (TransformSpec spec : partSpecs) {
           String[] row = new String[2];
           row[0] = spec.getColumnName();
           if (spec.getTransformType() != null) {
@@ -278,7 +278,14 @@ class TextDescTableFormatter extends DescTableFormatter {
       formatOutput("Num Buckets:", String.valueOf(storageDesc.getNumBuckets()), tableInfo);
       formatOutput("Bucket Columns:", storageDesc.getBucketCols().toString(), tableInfo);
     }
-    formatOutput("Sort Columns:", storageDesc.getSortCols().toString(), tableInfo);
+
+    String sortColumnsInfo;
+    if (table.isNonNative() && table.getStorageHandler() != null && table.getStorageHandler().supportsSortColumns()) {
+      sortColumnsInfo = table.getStorageHandler().sortColumns(table).toString();
+    } else {
+      sortColumnsInfo = storageDesc.getSortCols().toString();
+    }
+    formatOutput("Sort Columns:", sortColumnsInfo, tableInfo);
 
     if (storageDesc.isStoredAsSubDirectories()) {
       formatOutput("Stored As SubDirectories:", "Yes", tableInfo);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/set/AlterTableSetPartitionSpecAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/set/AlterTableSetPartitionSpecAnalyzer.java
index 442d927b1b7..4764eedf561 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/set/AlterTableSetPartitionSpecAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/set/AlterTableSetPartitionSpecAnalyzer.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ASTNode;
 import org.apache.hadoop.hive.ql.parse.HiveParser;
 import org.apache.hadoop.hive.ql.parse.PartitionTransform;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
 
@@ -54,7 +54,7 @@ public class AlterTableSetPartitionSpecAnalyzer extends AbstractAlterTableAnalyz
     Table table = getTable(tableName);
     validateAlterTableType(table, AlterTableType.SETPARTITIONSPEC, false);
     inputs.add(new ReadEntity(table));
-    List<PartitionTransformSpec> partitionTransformSpec =
+    List<TransformSpec> partitionTransformSpec =
         PartitionTransform.getPartitionTransformSpec(command);
     if (!SessionStateUtil.addResource(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC,
         partitionTransformSpec)) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLPlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLPlanUtils.java
index 62248ff4f71..649455163fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLPlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLPlanUtils.java
@@ -57,7 +57,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PrimaryKeyInfo;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.UniqueConstraint;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
 import org.apache.hadoop.hive.ql.util.DirectionUtils;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
@@ -920,13 +920,13 @@ public class DDLPlanUtils {
   private String getPartitionsBySpec(Table table) {
     if (table.isNonNative() && table.getStorageHandler() != null &&
       table.getStorageHandler().supportsPartitionTransform()) {
-      List<PartitionTransformSpec> specs = table.getStorageHandler().getPartitionTransformSpec(table);
+      List<TransformSpec> specs = table.getStorageHandler().getPartitionTransformSpec(table);
       if (specs.isEmpty()) {
         return "";
       }
       List<String> partitionTransforms = new ArrayList<>();
-      for (PartitionTransformSpec spec : specs) {
-        if (spec.getTransformType() == PartitionTransformSpec.TransformType.IDENTITY) {
+      for (TransformSpec spec : specs) {
+        if (spec.getTransformType() == TransformSpec.TransformType.IDENTITY) {
           partitionTransforms.add(spec.getColumnName());
         } else {
           partitionTransforms.add(spec.getTransformType().name() + "(" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
index b4b3cfca9d8..b67d5d83476 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.hive.ql.Context.Operation;
 import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.parse.AlterTableExecuteSpec;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec;
+import org.apache.hadoop.hive.ql.parse.TransformSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -346,6 +346,25 @@ public interface HiveStorageHandler extends Configurable {
     return Collections.emptyList();
   }
 
+  /**
+   * Check if the underlying storage handler implementation supports sort columns.
+   * @return true if the storage handler can support it
+   */
+  default boolean supportsSortColumns() {
+    return false;
+  }
+
+  /**
+   * Collect the columns that are used to sort the content of the data files
+   * @param table the table which is being sorted
+   * @return the list of columns that are used during data sorting
+   */
+  default List<FieldSchema> sortColumns(org.apache.hadoop.hive.ql.metadata.Table table) {
+    Preconditions.checkState(supportsSortColumns(), "Should only be called for table formats where data sorting " +
+        "is supported");
+    return Collections.emptyList();
+  }
+
   /**
    * Check if the underlying storage handler implementation support partition transformations.
    * @return true if the storage handler can support it
@@ -360,7 +379,7 @@ public interface HiveStorageHandler extends Configurable {
    * @param table the HMS table, must be non-null
    * @return partition transform specification, can be null.
    */
-  default List<PartitionTransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table table) {
+  default List<TransformSpec> getPartitionTransformSpec(org.apache.hadoop.hive.ql.metadata.Table table) {
     return null;
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
index 2668f269b5f..b57ddd8e6c4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.Utilities.ReduceField;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.SemanticDispatcher;
@@ -61,7 +62,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.parse.type.*;
+import org.apache.hadoop.hive.ql.parse.type.ExprNodeTypeCheck;
 import org.apache.hadoop.hive.ql.plan.ColStatistics;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -200,6 +201,8 @@ public class SortedDynPartitionOptimizer extends Transform {
       List<Integer> partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema());
       LinkedList<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs =
           new LinkedList<>(dpCtx.getCustomSortExpressions());
+      LinkedList<Integer> customSortOrder = new LinkedList<>(dpCtx.getCustomSortOrder());
+      LinkedList<Integer> customNullOrder = new LinkedList<>(dpCtx.getCustomSortNullOrder());
 
       // If custom sort expressions are present, there is an explicit requirement to do sorting
       if (customSortExprs.isEmpty() && !shouldDo(partitionPositions, fsParent)) {
@@ -301,8 +304,9 @@ public class SortedDynPartitionOptimizer extends Transform {
       fsOp.getConf().setTotalFiles(1);
 
       // Create ReduceSink operator
-      ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, customSortExprs, sortOrder,
-          sortNullOrder, allRSCols, bucketColumns, numBuckets, fsParent, fsOp.getConf().getWriteType());
+      ReduceSinkOperator rsOp = getReduceSinkOp(partitionPositions, sortPositions, sortOrder,
+          sortNullOrder, customSortExprs, customSortOrder, customNullOrder, allRSCols, bucketColumns, numBuckets,
+          fsParent, fsOp.getConf().getWriteType());
       // we have to make sure not to reorder the child operators as it might cause weird behavior in the tasks at
       // the same level. when there is auto stats gather at the same level as another operation then it might
       // cause unnecessary preemption. Maintaining the order here to avoid such preemption and possible errors
@@ -572,8 +576,10 @@ public class SortedDynPartitionOptimizer extends Transform {
     }
 
     public ReduceSinkOperator getReduceSinkOp(List<Integer> partitionPositions, List<Integer> sortPositions,
-        List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs, List<Integer> sortOrder,
-        List<Integer> sortNullOrder, ArrayList<ExprNodeDesc> allCols, ArrayList<ExprNodeDesc> bucketColumns,
+        List<Integer> sortOrder, List<Integer> sortNullOrder,
+        List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExprs,
+        List<Integer> customSortOrder, List<Integer> customSortNullOrder,
+        ArrayList<ExprNodeDesc> allCols, ArrayList<ExprNodeDesc> bucketColumns,
         int numBuckets, Operator<? extends OperatorDesc> parent, AcidUtils.Operation writeType) {
 
       // Order of KEY columns, if custom sort is present partition and bucket columns are disregarded:
@@ -601,17 +607,25 @@ public class SortedDynPartitionOptimizer extends Transform {
       }
       keyColsPosInVal.addAll(sortPositions);
 
-      // by default partition and bucket columns are sorted in ascending order
       Integer order = 1;
+      // by default partition and bucket columns are sorted in ascending order
       if (sortOrder != null && !sortOrder.isEmpty()) {
         if (sortOrder.get(0) == 0) {
           order = 0;
         }
       }
-      for (int i = 0; i < keyColsPosInVal.size() + customSortExprs.size(); i++) {
+
+      for (Integer ignored : keyColsPosInVal) {
         newSortOrder.add(order);
       }
 
+      if (customSortExprPresent) {
+        for (int i = 0; i < customSortExprs.size() - customSortOrder.size(); i++) {
+          newSortOrder.add(order);
+        }
+        newSortOrder.addAll(customSortOrder);
+      }
+
       String orderStr = "";
       for (Integer i : newSortOrder) {
         if (i == 1) {
@@ -631,10 +645,18 @@ public class SortedDynPartitionOptimizer extends Transform {
           nullOrder = 1;
         }
       }
-      for (int i = 0; i < keyColsPosInVal.size() + customSortExprs.size(); i++) {
+
+      for (Integer ignored : keyColsPosInVal) {
         newSortNullOrder.add(nullOrder);
       }
 
+      if (customSortExprPresent) {
+        for (int i = 0; i < customSortExprs.size() - customSortNullOrder.size(); i++) {
+          newSortNullOrder.add(nullOrder);
+        }
+        newSortNullOrder.addAll(customSortNullOrder);
+      }
+
       String nullOrderStr = "";
       for (Integer i : newSortNullOrder) {
         if (i == 0) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransform.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransform.java
index 50a6371a1bd..dbc40e8bd11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransform.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransform.java
@@ -18,11 +18,10 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.parse.PartitionTransformSpec.TransformType;
+import org.apache.hadoop.hive.ql.parse.TransformSpec.TransformType;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -45,9 +44,9 @@ public class PartitionTransform {
    * @param fields The partition column fields
    * @return list of partition transforms
    */
-  public static List<PartitionTransformSpec> getPartitionTransformSpec(List<FieldSchema> fields) {
+  public static List<TransformSpec> getPartitionTransformSpec(List<FieldSchema> fields) {
     return fields.stream()
-               .map(field -> new PartitionTransformSpec(field.getName(), TransformType.IDENTITY, Optional.empty()))
+               .map(field -> new TransformSpec(field.getName(), TransformType.IDENTITY, Optional.empty()))
                .collect(Collectors.toList());
   }
 
@@ -56,10 +55,10 @@ public class PartitionTransform {
    * @param node AST Tree node, must be not null
    * @return list of partition transforms
    */
-  public static List<PartitionTransformSpec> getPartitionTransformSpec(ASTNode node) {
-    List<PartitionTransformSpec> partSpecList = new ArrayList<>();
+  public static List<TransformSpec> getPartitionTransformSpec(ASTNode node) {
+    List<TransformSpec> partSpecList = new ArrayList<>();
     for (int i = 0; i < node.getChildCount(); i++) {
-      PartitionTransformSpec spec = new PartitionTransformSpec();
+      TransformSpec spec = new TransformSpec();
       ASTNode child = (ASTNode) node.getChild(i);
       for (int j = 0; j < child.getChildCount(); j++) {
         ASTNode grandChild = (ASTNode) child.getChild(j);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 485089e4ad3..1e51d51caf2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -13645,7 +13645,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         }
         break;
       case HiveParser.TOK_TABLEPARTCOLSBYSPEC:
-        List<PartitionTransformSpec> partitionTransformSpec =
+        List<TransformSpec> partitionTransformSpec =
             PartitionTransform.getPartitionTransformSpec(child);
 
         if (!SessionStateUtil.addResource(conf, hive_metastoreConstants.PARTITION_TRANSFORM_SPEC,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransformSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TransformSpec.java
similarity index 90%
rename from ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransformSpec.java
rename to ql/src/java/org/apache/hadoop/hive/ql/parse/TransformSpec.java
index 268660f8efa..f25cbda5af8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/PartitionTransformSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TransformSpec.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hive.ql.parse;
 
 import java.util.Optional;
 
-public class PartitionTransformSpec {
+public class TransformSpec {
 
   public enum TransformType {
     IDENTITY, YEAR, MONTH, DAY, HOUR, TRUNCATE, BUCKET, VOID
@@ -29,10 +29,10 @@ public class PartitionTransformSpec {
   private TransformType transformType;
   private Optional<Integer> transformParam;
 
-  public PartitionTransformSpec() {
+  public TransformSpec() {
   }
 
-  public PartitionTransformSpec(String columnName, TransformType transformType, Optional<Integer> transformParam) {
+  public TransformSpec(String columnName, TransformType transformType, Optional<Integer> transformParam) {
     this.columnName = columnName;
     this.transformType = transformType;
     this.transformParam = transformParam;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
index 4acc5406fb0..3497f3120cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
@@ -60,6 +60,8 @@ public class DynamicPartitionCtx implements Serializable {
    * schema and returns a single expression. Example for simply just referencing column 3: cols -> cols.get(3).clone()
    */
   private transient List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExpressions;
+  private transient List<Integer> customSortOrder;
+  private transient List<Integer> customSortNullOrder;
 
   public DynamicPartitionCtx() {
   }
@@ -93,6 +95,8 @@ public class DynamicPartitionCtx implements Serializable {
     }
     this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal);
     this.customSortExpressions = new LinkedList<>();
+    this.customSortOrder = new LinkedList<>();
+    this.customSortNullOrder = new LinkedList<>();
   }
 
   public DynamicPartitionCtx(Map<String, String> partSpec, String defaultPartName,
@@ -126,6 +130,8 @@ public class DynamicPartitionCtx implements Serializable {
     }
     this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal);
     this.customSortExpressions = new LinkedList<>();
+    this.customSortOrder = new LinkedList<>();
+    this.customSortNullOrder = new LinkedList<>();
   }
 
   public DynamicPartitionCtx(DynamicPartitionCtx dp) {
@@ -141,6 +147,8 @@ public class DynamicPartitionCtx implements Serializable {
     this.maxPartsPerNode = dp.maxPartsPerNode;
     this.whiteListPattern = dp.whiteListPattern;
     this.customSortExpressions = dp.customSortExpressions;
+    this.customSortOrder = dp.customSortOrder;
+    this.customSortNullOrder = dp.customSortNullOrder;
   }
 
   public Pattern getWhiteListPattern() {
@@ -234,4 +242,20 @@ public class DynamicPartitionCtx implements Serializable {
   public void setCustomSortExpressions(List<Function<List<ExprNodeDesc>, ExprNodeDesc>> customSortExpressions) {
     this.customSortExpressions = customSortExpressions;
   }
+
+  public List<Integer> getCustomSortOrder() {
+    return customSortOrder;
+  }
+
+  public void setCustomSortOrder(List<Integer> customSortOrder) {
+    this.customSortOrder = customSortOrder;
+  }
+
+  public List<Integer> getCustomSortNullOrder() {
+    return customSortNullOrder;
+  }
+
+  public void setCustomSortNullOrder(List<Integer> customSortNullOrder) {
+    this.customSortNullOrder = customSortNullOrder;
+  }
 }