You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ve...@apache.org on 2023/03/08 18:15:05 UTC

[hive] branch master updated: HIVE-26735: Ability to sort the data during rebalancing compaction (Laszlo Vegh, reviewed by Krisztian Kasa, Denys Kuzmenko, Sourabh Badhya)

This is an automated email from the ASF dual-hosted git repository.

veghlaci05 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new abbabdd86b3 HIVE-26735: Ability to sort the data during rebalancing compaction (Laszlo Vegh, reviewed by Krisztian Kasa, Denys Kuzmenko, Sourabh Badhya)
abbabdd86b3 is described below

commit abbabdd86b39e499a16105787028a9acb8baaef4
Author: veghlaci05 <ve...@gmail.com>
AuthorDate: Wed Mar 8 19:14:51 2023 +0100

    HIVE-26735: Ability to sort the data during rebalancing compaction (Laszlo Vegh, reviewed by Krisztian Kasa, Denys Kuzmenko, Sourabh Badhya)
    
    * HIVE-26735: Ability to sort the data during rebalancing compaction
    
    * fix failing tests
    
    * address review comments
    
    * address review comments - 2
    
    * remove empty test methods
---
 .../org/apache/hadoop/hive/conf/Constants.java     |   1 +
 .../ql/txn/compactor/CompactionPoolOnTezTest.java  |  13 +-
 .../hive/ql/txn/compactor/CompactorOnTezTest.java  |   3 +-
 .../ql/txn/compactor/TestCrudCompactorOnTez.java   | 322 ++++++++++++++-------
 .../hadoop/hive/ql/parse/AlterClauseParser.g       |   4 +-
 .../java/org/apache/hadoop/hive/ql/Compiler.java   |   3 +-
 ql/src/java/org/apache/hadoop/hive/ql/Driver.java  |   1 +
 .../storage/compact/AlterTableCompactAnalyzer.java |   7 +-
 .../storage/compact/AlterTableCompactDesc.java     |   9 +-
 .../compact/AlterTableCompactOperation.java        |   1 +
 .../concatenate/AlterTableConcatenateAnalyzer.java |   2 +-
 .../ql/txn/compactor/CompactionQueryBuilder.java   |  53 +++-
 .../ql/txn/compactor/RebalanceQueryCompactor.java  |   6 +-
 .../hadoop/hive/ql/txn/compactor/Worker.java       |   9 +-
 .../hadoop/hive/ql/txn/compactor/TestWorker.java   |   6 +-
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp    |  52 +++-
 .../src/gen/thrift/gen-cpp/hive_metastore_types.h  |  29 +-
 .../hive/metastore/api/CompactionInfoStruct.java   | 114 +++++++-
 .../hive/metastore/api/CompactionRequest.java      | 114 +++++++-
 .../apache/hadoop/hive/metastore/api/TxnType.java  |   5 +-
 .../gen-php/metastore/CompactionInfoStruct.php     |  24 ++
 .../thrift/gen-php/metastore/CompactionRequest.php |  24 ++
 .../src/gen/thrift/gen-php/metastore/TxnType.php   |   3 +
 .../src/gen/thrift/gen-py/hive_metastore/ttypes.py |  31 +-
 .../src/gen/thrift/gen-rb/hive_metastore_types.rb  |  13 +-
 .../hadoop/hive/metastore/txn/TxnQueries.java      |   9 +-
 .../src/main/thrift/hive_metastore.thrift          |   5 +-
 .../hadoop/hive/metastore/txn/CompactionInfo.java  |  61 ++--
 .../hive/metastore/txn/CompactionTxnHandler.java   |  17 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java      |   9 +-
 .../hive/metastore/utils/MetaStoreServerUtils.java |   5 +
 .../src/main/sql/derby/hive-schema-4.0.0.derby.sql |   6 +-
 .../derby/upgrade-4.0.0-alpha-2-to-4.0.0.derby.sql |   6 +-
 .../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql |   2 +
 .../mssql/upgrade-4.0.0-alpha-2-to-4.0.0.mssql.sql |   6 +-
 .../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql |   6 +-
 .../mysql/upgrade-4.0.0-alpha-2-to-4.0.0.mysql.sql |   6 +-
 .../main/sql/oracle/hive-schema-4.0.0.oracle.sql   |   6 +-
 .../upgrade-4.0.0-alpha-2-to-4.0.0.oracle.sql      |   6 +-
 .../sql/postgres/hive-schema-4.0.0.postgres.sql    |   6 +-
 .../upgrade-4.0.0-alpha-2-to-4.0.0.postgres.sql    |   6 +-
 .../upgrade-3.1.3000-to-4.0.0.postgres.sql         |   6 +-
 42 files changed, 800 insertions(+), 217 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
index d39c671cac4..ef05123560f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java
@@ -102,6 +102,7 @@ public class Constants {
 
   public static final Pattern COMPACTION_POOLS_PATTERN = Pattern.compile("hive\\.compactor\\.worker\\.(.*)\\.threads");
   public static final String HIVE_COMPACTOR_WORKER_POOL = "hive.compactor.worker.pool";
+  public static final String HIVE_COMPACTOR_REBALANCE_ORDERBY = "hive.compactor.rebalance.orderby";
 
   public static final String HTTP_HEADER_REQUEST_TRACK = "X-Request-ID";
   public static final String TIME_POSTFIX_REQUEST_TRACK = "_TIME";
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionPoolOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionPoolOnTezTest.java
index f7e6985cbc7..ee552208bbe 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionPoolOnTezTest.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionPoolOnTezTest.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
@@ -68,7 +69,7 @@ public class CompactionPoolOnTezTest extends CompactorOnTezTest {
   @Test
   public void testAlterTableCompactCommandRespectsPoolName() throws Exception {
     Map<String, String> properties = new HashMap<>();
-    properties.put("hive.compactor.worker.pool", "pool1");
+    properties.put(Constants.HIVE_COMPACTOR_WORKER_POOL, "pool1");
     provider.createFullAcidTable(null, DEFAULT_TABLE_NAME, false, false, properties);
     provider.insertTestData(DEFAULT_TABLE_NAME, false);
 
@@ -91,7 +92,7 @@ public class CompactionPoolOnTezTest extends CompactorOnTezTest {
   @Test
   public void testInitiatorRespectsTableLevelPoolName() throws Exception {
     Map<String, String> properties = new HashMap<>();
-    properties.put("hive.compactor.worker.pool", "pool1");
+    properties.put(Constants.HIVE_COMPACTOR_WORKER_POOL, "pool1");
     provider.createFullAcidTable(null, DEFAULT_TABLE_NAME, false, false, properties);
     provider.insertTestData(DEFAULT_TABLE_NAME, false);
 
@@ -114,7 +115,7 @@ public class CompactionPoolOnTezTest extends CompactorOnTezTest {
   public void testInitiatorRespectsTableLevelPoolNameOverDbLevel() throws Exception {
     provider.createDb(NON_DEFAULT_DB_NAME, "db_pool");
     Map<String, String> properties = new HashMap<>();
-    properties.put("hive.compactor.worker.pool", "table_pool");
+    properties.put(Constants.HIVE_COMPACTOR_WORKER_POOL, "table_pool");
     provider.createFullAcidTable(NON_DEFAULT_DB_NAME, DEFAULT_TABLE_NAME, false, false, properties);
     provider.insertTestData(NON_DEFAULT_DB_NAME, DEFAULT_TABLE_NAME);
 
@@ -126,7 +127,7 @@ public class CompactionPoolOnTezTest extends CompactorOnTezTest {
   @Test
   public void testShowCompactionsContainsPoolName() throws Exception {
     Map<String, String> properties = new HashMap<>();
-    properties.put("hive.compactor.worker.pool", "pool1");
+    properties.put(Constants.HIVE_COMPACTOR_WORKER_POOL, "pool1");
     provider.createFullAcidTable(null, DEFAULT_TABLE_NAME, false, false, properties);
     provider.insertTestData(DEFAULT_TABLE_NAME, false);
     provider.createFullAcidTable(null, "table2", false, false);
@@ -150,10 +151,10 @@ public class CompactionPoolOnTezTest extends CompactorOnTezTest {
   @Test
   public void testShowCompactionsRespectPoolName() throws Exception {
     Map<String, String> properties = new HashMap<>();
-    properties.put("hive.compactor.worker.pool", "pool1");
+    properties.put(Constants.HIVE_COMPACTOR_WORKER_POOL, "pool1");
     provider.createFullAcidTable(null, DEFAULT_TABLE_NAME, false, false, properties);
     provider.insertTestData(DEFAULT_TABLE_NAME, false);
-    properties.put("hive.compactor.worker.pool", "pool2");
+    properties.put(Constants.HIVE_COMPACTOR_WORKER_POOL, "pool2");
     provider.createFullAcidTable(null, "table2", false, false, properties);
     provider.insertTestData("table2", false);
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index 4c6209d55c7..56ca350b739 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -286,7 +287,7 @@ public abstract class CompactorOnTezTest {
 
     void createDb(String dbName, String poolName) throws Exception {
       executeStatementOnDriver("drop database if exists " + dbName + " cascade", driver);
-      executeStatementOnDriver("create database " + dbName + " WITH DBPROPERTIES('hive.compactor.worker.pool'='" + poolName + "')", driver);
+      executeStatementOnDriver("create database " + dbName + " WITH DBPROPERTIES('" + Constants.HIVE_COMPACTOR_WORKER_POOL + "'='" + poolName + "')", driver);
     }
 
     /**
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index dd5686ac84c..62d4a008027 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -25,6 +25,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -61,6 +63,7 @@ import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hive.streaming.HiveStreamingConnection;
@@ -76,6 +79,7 @@ import org.apache.orc.impl.RecordReaderImpl;
 import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.FieldSetter;
 
 import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.runWorker;
@@ -88,73 +92,178 @@ import static org.mockito.Mockito.*;
 public class TestCrudCompactorOnTez extends CompactorOnTezTest {
 
   @Test
-  public void testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable() throws Exception {
+  public void testRebalanceCompactionWithParallelDeleteAsSecondOptimisticLock() throws Exception {
+    testRebalanceCompactionWithParallelDeleteAsSecond(true);
+  }
+
+  @Test
+  public void testRebalanceCompactionWithParallelDeleteAsSecondPessimisticLock() throws Exception {
+    testRebalanceCompactionWithParallelDeleteAsSecond(false);
+  }
+
+  private void testRebalanceCompactionWithParallelDeleteAsSecond(boolean optimisticLock) throws Exception {
     conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
     conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
     conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+    conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, optimisticLock);
 
     //set grouping size to have 3 buckets, and re-create driver with the new config
     conf.set("tez.grouping.min-size", "1000");
     conf.set("tez.grouping.max-size", "80000");
     driver = new Driver(conf);
 
-    final String stageTableName = "stage_rebalance_test";
     final String tableName = "rebalance_test";
-    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+    TestDataProvider testDataProvider = prepareRebalanceTestData(tableName);
 
-    TestDataProvider testDataProvider = new TestDataProvider();
-    testDataProvider.createFullAcidTable(stageTableName, true, false);
-    testDataProvider.insertTestData(stageTableName, true);
+    //Try to do a rebalancing compaction
+    executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 'rebalance' ORDER BY b DESC", driver);
+
+    CountDownLatch startDelete = new CountDownLatch(1);
+    CountDownLatch endDelete = new CountDownLatch(1);
+    CompactorFactory factory = Mockito.spy(CompactorFactory.getInstance());
+    doAnswer(invocation -> {
+      Object result = invocation.callRealMethod();
+      startDelete.countDown();
+      Thread.sleep(1000);
+      return result;
+    }).when(factory).getCompactorPipeline(any(), any(), any(), any());
 
-    executeStatementOnDriver("drop table if exists " + tableName, driver);
-    executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) " +
-        "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
-    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select a, b from " + stageTableName, driver);
+    Worker worker = new Worker(factory);
+    worker.setConf(conf);
+    worker.init(new AtomicBoolean(true));
+    worker.start();
 
-    //do some single inserts to have more data in the first bucket.
-    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('12',12)", driver);
-    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('13',13)", driver);
-    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('14',14)", driver);
-    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('15',15)", driver);
-    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('16',16)", driver);
-    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('17',17)", driver);
+    if (!startDelete.await(10, TimeUnit.SECONDS)) {
+      throw new RuntimeException("Waiting for the compaction to start timed out!");
+    }
+
+    boolean aborted = false;
+    try {
+      executeStatementOnDriver("DELETE FROM " + tableName + " WHERE b = 12", driver);
+    } catch (CommandProcessorException e) {
+      if (optimisticLock) {
+        Assert.fail("In case of TXN_WRITE_X_LOCK = true, the transaction must be retried instead of being aborted.");
+      }
+      aborted = true;
+      Assert.assertEquals(e.getCause().getClass(), LockException.class);
+      Assert.assertEquals(e.getCauseMessage(), "Transaction manager has aborted the transaction txnid:21.  Reason: Aborting [txnid:21,24] due to a write conflict on default/rebalance_test committed by [txnid:20,24] d/u");
+      // Delete the record, so the rest of the test can be the same in both cases
+      executeStatementOnDriver("DELETE FROM " + tableName + " WHERE b = 12", driver);
+    } finally {
+      if(!optimisticLock && !aborted) {
+        Assert.fail("In case of TXN_WRITE_X_LOCK = false, the transaction must be aborted instead of being retried.");
+      }
+    }
+    endDelete.countDown();
+
+    worker.join();
+
+    driver.close();
+    driver = new Driver(conf);
+
+    List<String> result = execSelectAndDumpData("select * from " + tableName + " WHERE b = 12", driver,
+        "Dumping data for " + tableName + " after load:");
+    Assert.assertEquals(0, result.size());
+
+    //Check if the compaction succeed
+    verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
 
-    // Verify buckets and their content before rebalance
-    Table table = msClient.getTable("default", tableName);
-    FileSystem fs = FileSystem.get(conf);
-    Assert.assertEquals("Test setup does not match the expected: different buckets",
-        Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
-        CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
     String[][] expectedBuckets = new String[][] {
         {
-            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
-            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
-            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
-            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
-            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
-            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
-            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
-            "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
-            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
-            "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
-            "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
             "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t16\t16",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":2}\t15\t15",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":3}\t14\t14",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":4}\t13\t13",
         },
         {
-            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
-            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
-            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
-            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+            "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":6}\t3\t4",
+            "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":7}\t4\t4",
+            "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":8}\t2\t4",
+            "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":9}\t5\t4",
+            "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":10}\t6\t4",
+            "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":11}\t4\t3",
         },
         {
-            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
-            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":12}\t2\t3",
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":13}\t3\t3",
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":14}\t6\t3",
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":15}\t5\t3",
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":16}\t6\t2",
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t5\t2",
         },
     };
-    for(int i = 0; i < 3; i++) {
-      Assert.assertEquals("unbalanced bucket " + i, Arrays.asList(expectedBuckets[i]),
-          testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + ""));
-    }
+    verifyRebalance(testDataProvider, tableName, null, expectedBuckets,
+        new String[] {"bucket_00000", "bucket_00001", "bucket_00002"}, "base_0000007_v0000020");
+  }
+
+  @Test
+  public void testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTableWithOrder() throws Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+    //set grouping size to have 3 buckets, and re-create driver with the new config
+    conf.set("tez.grouping.min-size", "1000");
+    conf.set("tez.grouping.max-size", "80000");
+    driver = new Driver(conf);
+
+    final String tableName = "rebalance_test";
+    TestDataProvider testDataProvider = prepareRebalanceTestData(tableName);
+
+    //Try to do a rebalancing compaction
+    executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 'rebalance' ORDER BY b DESC", driver);
+    runWorker(conf);
+
+    driver.close();
+    driver = new Driver(conf);
+
+    //Check if the compaction succeed
+    verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
+
+    String[][] expectedBuckets = new String[][] {
+        {
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t16\t16",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":2}\t15\t15",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":3}\t14\t14",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":4}\t13\t13",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":5}\t12\t12",
+        },
+        {
+            "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":6}\t3\t4",
+            "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":7}\t4\t4",
+            "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":8}\t2\t4",
+            "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":9}\t5\t4",
+            "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":10}\t6\t4",
+            "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":11}\t4\t3",
+        },
+        {
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":12}\t2\t3",
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":13}\t3\t3",
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":14}\t6\t3",
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":15}\t5\t3",
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":16}\t6\t2",
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t5\t2",
+        },
+    };
+    verifyRebalance(testDataProvider, tableName, null, expectedBuckets,
+        new String[] {"bucket_00000", "bucket_00001", "bucket_00002"}, "base_0000007_v0000020");
+  }
+
+  @Test
+  public void testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable() throws Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+    //set grouping size to have 3 buckets, and re-create driver with the new config
+    conf.set("tez.grouping.min-size", "1000");
+    conf.set("tez.grouping.max-size", "80000");
+    driver = new Driver(conf);
+
+    final String tableName = "rebalance_test";
+    TestDataProvider testDataProvider = prepareRebalanceTestData(tableName);
 
     //Try to do a rebalancing compaction
     executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 'rebalance'", driver);
@@ -163,11 +272,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     //Check if the compaction succeed
     verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
 
-    // Verify buckets and their content after rebalance
-    Assert.assertEquals("Buckets does not match after compaction",
-        Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"),
-        CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000007_v0000020"));
-    expectedBuckets = new String[][] {
+    String[][] expectedBuckets = new String[][] {
         {
             "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
             "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
@@ -193,10 +298,8 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
             "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17",
         },
     };
-    for(int i = 0; i < 3; i++) {
-      Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]),
-          testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + ""));
-    }
+    verifyRebalance(testDataProvider, tableName, null, expectedBuckets,
+        new String[] {"bucket_00000", "bucket_00001", "bucket_00002"}, "base_0000007_v0000020");
   }
 
   @Test
@@ -278,10 +381,6 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     //Check if the compaction succeed
     verifyCompaction(1, TxnStore.CLEANING_RESPONSE);
 
-    // Verify buckets and their content after rebalance in partition ds=tomorrow
-    Assert.assertEquals("Buckets does not match after compaction",
-        Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"),
-        CompactorTestUtil.getBucketFileNames(fs, table, "ds=tomorrow", "base_0000007_v0000016"));
     expectedBuckets = new String[][] {
         {
             "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow",
@@ -308,10 +407,8 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
             "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17\ttomorrow",
         },
     };
-    for(int i = 0; i < 3; i++) {
-      Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]),
-          testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + ""));
-    }
+    verifyRebalance(testDataProvider, tableName, "ds=tomorrow", expectedBuckets,
+        new String[] {"bucket_00000", "bucket_00001", "bucket_00002"}, "base_0000007_v0000016");
   }
 
   @Test
@@ -347,9 +444,49 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     conf.set("tez.grouping.max-size", "80000");
     driver = new Driver(conf);
 
-    final String stageTableName = "stage_rebalance_test";
     final String tableName = "rebalance_test";
-    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+    TestDataProvider testDataProvider = prepareRebalanceTestData(tableName);
+
+    //Try to do a rebalancing compaction
+    executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 'rebalance' CLUSTERED INTO 4 BUCKETS", driver);
+    runWorker(conf);
+
+    verifyCompaction(1,  TxnStore.CLEANING_RESPONSE);
+
+    String[][] expectedBuckets = new String[][] {
+        {
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":5}\t5\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t2\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t4\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t4\t3",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":10}\t2\t3",
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":11}\t3\t4",
+            "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12",
+            "{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13",
+            "{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14",
+        },
+        {
+            "{\"writeid\":5,\"bucketid\":537067520,\"rowid\":15}\t15\t15",
+            "{\"writeid\":6,\"bucketid\":537067520,\"rowid\":16}\t16\t16",
+            "{\"writeid\":7,\"bucketid\":537067520,\"rowid\":17}\t17\t17",
+        },
+    };
+    verifyRebalance(testDataProvider, tableName, null, expectedBuckets,
+        new String[] {"bucket_00000", "bucket_00001", "bucket_00002", "bucket_00003"}, "base_0000007_v0000020");
+  }
+
+  private TestDataProvider prepareRebalanceTestData(String tableName) throws Exception {
+    final String stageTableName = "stage_" + tableName;
 
     TestDataProvider testDataProvider = new TestDataProvider();
     testDataProvider.createFullAcidTable(stageTableName, true, false);
@@ -400,57 +537,24 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
             "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
         },
     };
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
     for(int i = 0; i < 3; i++) {
-      Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]),
+      Assert.assertEquals("unbalanced bucket " + i, Arrays.asList(expectedBuckets[i]),
           testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + ""));
     }
+    return testDataProvider;
+  }
 
-    //Try to do a rebalancing compaction
-    executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 'rebalance' CLUSTERED INTO 4 BUCKETS", driver);
-    runWorker(conf);
-
-    //Check if the compaction succeed
-    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
-    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
-    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
-    Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, compacts.size());
-    Assert.assertEquals("Expecting compaction state 'ready for cleaning' and found:" + compacts.get(0).getState(),
-        "ready for cleaning", compacts.get(0).getState());
-
+  private void verifyRebalance(TestDataProvider testDataProvider, String tableName, String partitionName,
+                               String[][] expectedBucketContent, String[] bucketNames, String folderName) throws Exception {
     // Verify buckets and their content after rebalance
-    Assert.assertEquals("Buckets does not match after compaction",
-        Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002", "bucket_00003"),
-        CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000007_v0000020"));
-    expectedBuckets = new String[][] {
-        {
-            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
-            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
-            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
-            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
-            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
-        },
-        {
-            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":5}\t5\t3",
-            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t2\t4",
-            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t3",
-            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t4\t4",
-            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t4\t3",
-        },
-        {
-            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":10}\t2\t3",
-            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":11}\t3\t4",
-            "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12",
-            "{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13",
-            "{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14",
-        },
-        {
-            "{\"writeid\":5,\"bucketid\":537067520,\"rowid\":15}\t15\t15",
-            "{\"writeid\":6,\"bucketid\":537067520,\"rowid\":16}\t16\t16",
-            "{\"writeid\":7,\"bucketid\":537067520,\"rowid\":17}\t17\t17",
-        },
-    };
-    for(int i = 0; i < expectedBuckets.length; i++) {
-      Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]),
+    Table table = msClient.getTable("default", tableName);
+    FileSystem fs = FileSystem.get(conf);
+    Assert.assertEquals("Buckets does not match after compaction", Arrays.asList(bucketNames),
+        CompactorTestUtil.getBucketFileNames(fs, table, partitionName, folderName));
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+    for(int i = 0; i < expectedBucketContent.length; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBucketContent[i]),
           testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + ""));
     }
   }
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
index 2542909cb80..befca385ac0 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g
@@ -440,8 +440,8 @@ compactPool
 alterStatementSuffixCompact
 @init { gParent.msgs.push("compaction request"); }
 @after { gParent.msgs.pop(); }
-    : KW_COMPACT compactType=StringLiteral tableImplBuckets? blocking? compactPool? (KW_WITH KW_OVERWRITE KW_TBLPROPERTIES tableProperties)?
-    -> ^(TOK_ALTERTABLE_COMPACT $compactType tableImplBuckets? blocking? compactPool? tableProperties?)
+    : KW_COMPACT compactType=StringLiteral tableImplBuckets? blocking? compactPool? (KW_WITH KW_OVERWRITE KW_TBLPROPERTIES tableProperties)? orderByClause?
+    -> ^(TOK_ALTERTABLE_COMPACT $compactType tableImplBuckets? blocking? compactPool? tableProperties? orderByClause?)
     ;
 
 alterStatementSuffixSetOwner
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
index 26377ed9374..9d677cd6d24 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Schema;
 import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import org.apache.hadoop.hive.ql.exec.ExplainTask;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
@@ -263,7 +264,7 @@ public class Compiler {
 
   private void openTransaction(TxnType txnType) throws LockException, CommandProcessorException {
     if (DriverUtils.checkConcurrency(driverContext) && startImplicitTxn(driverContext.getTxnManager()) &&
-        !driverContext.getTxnManager().isTxnOpen() && txnType != TxnType.COMPACTION) {
+        !driverContext.getTxnManager().isTxnOpen() && !MetaStoreServerUtils.isCompactionTxn(txnType)) {
       String userFromUGI = DriverUtils.getUserFromUGI(driverContext);
       if (HiveOperation.REPLDUMP.equals(driverContext.getQueryState().getHiveOperation())
          || HiveOperation.REPLLOAD.equals(driverContext.getQueryState().getHiveOperation())) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index d52c661872c..663585ef9c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -289,6 +289,7 @@ public class Driver implements IDriver {
           }
           // Since we're reusing the compiled plan, we need to update its start time for current run
           driverContext.getPlan().setQueryStartTime(driverContext.getQueryDisplay().getQueryStartTime());
+          driverContext.setRetrial(false);
         }
         // Re-check snapshot only in case we had to release locks and open a new transaction,
         // otherwise exclusive locks should protect output tables/partitions in snapshot from concurrent writes.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactAnalyzer.java
index f0662f28713..7c80a92ac61 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactAnalyzer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.ddl.table.storage.compact;
 
 import java.util.Map;
 
+import org.antlr.runtime.TokenRewriteStream;
 import org.antlr.runtime.tree.Tree;
 import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -56,6 +57,7 @@ public class AlterTableCompactAnalyzer extends AbstractAlterTableAnalyzer {
     Map<String, String> mapProp = null;
     boolean isBlocking = false;
     String poolName = null;
+    String orderBy = null;
     for (int i = 0; i < command.getChildCount(); i++) {
       Tree node = command.getChild(i);
       switch (node.getType()) {
@@ -75,13 +77,16 @@ public class AlterTableCompactAnalyzer extends AbstractAlterTableAnalyzer {
             throw new SemanticException("Could not parse bucket number: " + node.getChild(0).getText());
           }
           break;
+        case HiveParser.TOK_ORDERBY:
+          orderBy = this.ctx.getTokenRewriteStream().toOriginalString(node.getTokenStartIndex(), node.getTokenStopIndex());
+          break;
         default:
           break;
       }
     }
 
     AlterTableCompactDesc desc = new AlterTableCompactDesc(tableName, partitionSpec, type, isBlocking, poolName,
-        numberOfBuckets, mapProp);
+        numberOfBuckets, mapProp, orderBy);
     addInputsOutputsAlterTable(tableName, partitionSpec, desc, desc.getType(), false);
     rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc)));
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactDesc.java
index 48876498756..eb0d0c2d2ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactDesc.java
@@ -41,10 +41,11 @@ public class AlterTableCompactDesc extends AbstractAlterTableDesc implements DDL
   private final String poolName;
   private final int numberOfBuckets;
   private final Map<String, String> properties;
+  private final String orderByClause;
   private Long writeId;
 
   public AlterTableCompactDesc(TableName tableName, Map<String, String> partitionSpec, String compactionType,
-      boolean isBlocking, String poolName, int numberOfBuckets, Map<String, String> properties)
+      boolean isBlocking, String poolName, int numberOfBuckets, Map<String, String> properties, String orderByClause)
       throws SemanticException{
     super(AlterTableType.COMPACT, tableName, partitionSpec, null, false, false, properties);
     this.tableName = tableName.getNotEmptyDbTable();
@@ -54,6 +55,7 @@ public class AlterTableCompactDesc extends AbstractAlterTableDesc implements DDL
     this.poolName = poolName;
     this.numberOfBuckets = numberOfBuckets;
     this.properties = properties;
+    this.orderByClause = orderByClause;
   }
 
   @Explain(displayName = "table name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
@@ -92,6 +94,11 @@ public class AlterTableCompactDesc extends AbstractAlterTableDesc implements DDL
     return properties;
   }
 
+  @Explain(displayName = "order by", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+  public String getOrderByClause() {
+    return orderByClause;
+  }
+
   @Override
   public void setWriteId(long writeId) {
     this.writeId = writeId;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
index e1bcd9a92c8..9187101367b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
@@ -103,6 +103,7 @@ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDe
     req.setProperties(desc.getProperties());
     req.setInitiatorId(JavaUtils.hostname() + "-" + HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION);
     req.setInitiatorVersion(HiveMetaStoreClient.class.getPackage().getImplementationVersion());
+    req.setOrderByClause(desc.getOrderByClause());
     if (desc.getNumberOfBuckets() > 0) {
       req.setNumberOfBuckets(desc.getNumberOfBuckets());
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/concatenate/AlterTableConcatenateAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/concatenate/AlterTableConcatenateAnalyzer.java
index c81bd2b72ef..8e4a9d1b6fd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/concatenate/AlterTableConcatenateAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/concatenate/AlterTableConcatenateAnalyzer.java
@@ -104,7 +104,7 @@ public class AlterTableConcatenateAnalyzer extends AbstractAlterTableAnalyzer {
     boolean isBlocking = !HiveConf.getBoolVar(conf, ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, false);
 
     AlterTableCompactDesc desc = new AlterTableCompactDesc(tableName, partitionSpec, CompactionType.MAJOR.name(), isBlocking,
-        poolName, 0, null);
+        poolName, 0, null, null);
     addInputsOutputsAlterTable(tableName, partitionSpec, desc, desc.getType(), false);
     rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc)));
     setAcidDdlDesc(getTable(tableName), desc);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
index 50c616749a3..3ca930d5cf5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.ColumnType;
@@ -37,7 +38,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.util.DirectionUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,6 +73,7 @@ class CompactionQueryBuilder {
   private Partition sourcePartition; // for Insert in major and insert-only minor
   private String sourceTabForInsert; // for Insert
   private int numberOfBuckets;  //for rebalance
+  private String orderByClause;  //for rebalance
 
   // settable booleans
   private boolean isPartitioned; // for Create
@@ -172,6 +173,16 @@ class CompactionQueryBuilder {
     return this;
   }
 
+  /**
+   * Sets the order by clause for a rebalancing compaction. It will be used to re-order the data in the table during
+   * the compaction.
+   * @param orderByClause The ORDER BY clause to use for data reordering.
+   */
+  public CompactionQueryBuilder setOrderByClause(String orderByClause) {
+    this.orderByClause = orderByClause;
+    return this;
+  }
+
   /**
    * If true, Create operations will result in a table with partition column `file_name`.
    */
@@ -237,7 +248,7 @@ class CompactionQueryBuilder {
       query.append(" temporary external");
     }
     if (operation == Operation.INSERT) {
-      query.append(" into");
+      query.append(CompactionType.REBALANCE.equals(compactionType) ? " overwrite" : " into");
     }
     query.append(" table ");
 
@@ -306,12 +317,18 @@ class CompactionQueryBuilder {
     }
     switch (compactionType) {
       case REBALANCE: {
-        query.append("0, t2.writeId, t2.rowId / CEIL(numRows / ");
-        query.append(numberOfBuckets);
-        query.append("), t2.rowId, t2.writeId, t2.data from (select ");
-        query.append("count(ROW__ID.writeId) over() as numRows, ROW__ID.writeId as writeId, " +
-            "(row_number() OVER (order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC)) -1 AS rowId, " +
-            "NAMED_STRUCT(");
+        query.append("0, t2.writeId, t2.rowId DIV CEIL(numRows / ")
+            .append(numberOfBuckets)
+            .append("), t2.rowId, t2.writeId, t2.data from (select ")
+            .append("count(ROW__ID.writeId) over() as numRows, ");
+        if (StringUtils.isNotBlank(orderByClause)) {
+          // in case of reordering the data the writeids cannot be kept.
+          query.append("MAX(ROW__ID.writeId) over() as writeId, row_number() OVER (")
+            .append(orderByClause);
+        } else {
+          query.append("ROW__ID.writeId as writeId, row_number() OVER (order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC");
+        }
+        query.append(") - 1 AS rowId, NAMED_STRUCT(");
         for (int i = 0; i < cols.size(); ++i) {
           query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', `")
               .append(cols.get(i).getName()).append("`");
@@ -368,8 +385,16 @@ class CompactionQueryBuilder {
     } else {
       query.append(sourceTab.getDbName()).append(".").append(sourceTab.getTableName());
     }
+    query.append(" ");
     if (CompactionType.REBALANCE.equals(compactionType)) {
-      query.append(" order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC) t2");
+      if (StringUtils.isNotBlank(orderByClause)) {
+        query.append(orderByClause);
+      } else {
+        query.append("order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC");
+      }
+      query.append(") t2");
+    } else if (CompactionType.MAJOR.equals(compactionType) && insertOnly && StringUtils.isNotBlank(orderByClause)) {
+      query.append(orderByClause);
     }
   }
 
@@ -399,7 +424,7 @@ class CompactionQueryBuilder {
       long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds();
       if (invalidWriteIds.length > 0) {
         query.append(" where `originalTransaction` not in (").append(
-            org.apache.commons.lang3.StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ","))
+            StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ","))
             .append(")");
       }
     }
@@ -462,7 +487,7 @@ class CompactionQueryBuilder {
       String columnDesc = "`" + col.getName() + "` " + (!insertOnly ? ":" : "") + columnType;
       columnDescs.add(columnDesc);
     }
-    query.append(StringUtils.join(',',columnDescs));
+    query.append(StringUtils.join(columnDescs, ','));
     query.append(!insertOnly ? ">" : "");
     query.append(") ");
   }
@@ -477,7 +502,7 @@ class CompactionQueryBuilder {
     boolean isFirst;
     List<String> buckCols = sourceTab.getSd().getBucketCols();
     if (buckCols.size() > 0) {
-      query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
+      query.append("CLUSTERED BY (").append(StringUtils.join(buckCols, ",")).append(") ");
       List<Order> sortCols = sourceTab.getSd().getSortCols();
       if (sortCols.size() > 0) {
         query.append("SORTED BY (");
@@ -531,14 +556,14 @@ class CompactionQueryBuilder {
       SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo();
       if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
         query.append(" SKEWED BY (")
-            .append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
+            .append(StringUtils.join(skewedInfo.getSkewedColNames(), ", ")).append(") ON ");
         isFirst = true;
         for (List<String> colValues : skewedInfo.getSkewedColValues()) {
           if (!isFirst) {
             query.append(", ");
           }
           isFirst = false;
-          query.append("('").append(StringUtils.join("','", colValues)).append("')");
+          query.append("('").append(StringUtils.join(colValues, "','")).append("')");
         }
         query.append(") STORED AS DIRECTORIES");
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java
index 591a4b4dfb3..a9849404c7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java
@@ -63,7 +63,8 @@ final class RebalanceQueryCompactor extends QueryCompactor {
 
 
     List<String> createQueries = getCreateQueries(tmpTableName, table, tmpTablePath.toString());
-    List<String> compactionQueries = getCompactionQueries(table, context.getPartition(), tmpTableName, numBuckets);
+    List<String> compactionQueries = getCompactionQueries(table, context.getPartition(), tmpTableName, numBuckets,
+        context.getCompactionInfo().orderByClause);
     List<String> dropQueries = getDropQueries(tmpTableName);
     runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, context.getCompactionInfo(),
         Lists.newArrayList(tmpTablePath), createQueries, compactionQueries, dropQueries,
@@ -82,7 +83,7 @@ final class RebalanceQueryCompactor extends QueryCompactor {
         .build());
   }
 
-  private List<String> getCompactionQueries(Table t, Partition p, String tmpName, int numberOfBuckets) {
+  private List<String> getCompactionQueries(Table t, Partition p, String tmpName, int numberOfBuckets, String orderByClause) {
     return Lists.newArrayList(
         new CompactionQueryBuilder(
             CompactionType.REBALANCE,
@@ -92,6 +93,7 @@ final class RebalanceQueryCompactor extends QueryCompactor {
             .setSourceTab(t)
             .setSourcePartition(p)
             .setNumberOfBuckets(numberOfBuckets)
+            .setOrderByClause(orderByClause)
             .build());
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 247164cc8a1..459b7d2bbd5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -605,10 +605,15 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
      * @throws TException
      */
     void open(CompactionInfo ci) throws TException {
-      this.txnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
+      this.txnId = msc.openTxn(ci.runAs, ci.type == CompactionType.REBALANCE ? TxnType.REBALANCE_COMPACTION : TxnType.COMPACTION);
       status = TxnStatus.OPEN;
 
-      LockRequest lockRequest = createLockRequest(ci, txnId, LockType.SHARED_READ, DataOperationType.SELECT);
+      LockRequest lockRequest;
+      if (CompactionType.REBALANCE.equals(ci.type)) {
+        lockRequest = createLockRequest(ci, txnId, LockType.EXCL_WRITE, DataOperationType.UPDATE);
+      } else {
+        lockRequest = createLockRequest(ci, txnId, LockType.SHARED_READ, DataOperationType.SELECT);
+      }
       LockResponse res = msc.lock(lockRequest);
       if (res.getState() != LockState.ACQUIRED) {
         throw new TException("Unable to acquire lock(s) on {" + ci.getFullPartitionName()
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 905d82a0659..8e1f3c52991 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -1180,19 +1180,19 @@ public class TestWorker extends CompactorTest {
   }
 
   // With high timeout, but fast run we should finish without a problem
-  @Test(timeout=1000)
+  @Test(timeout=2000)
   public void testNormalRun() throws Exception {
     runTimeoutTest(10000, false, true);
   }
 
   // With low timeout, but slow run we should finish without a problem
-  @Test(timeout=1000)
+  @Test(timeout=2000)
   public void testTimeoutWithInterrupt() throws Exception {
     runTimeoutTest(1, true, false);
   }
 
   // With low timeout, but slow run we should finish without a problem, even if the interrupt is swallowed
-  @Test(timeout=1000)
+  @Test(timeout=2000)
   public void testTimeoutWithoutInterrupt() throws Exception {
     runTimeoutTest(1, true, true);
   }
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index 00343d9054a..8de1ff8573b 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -618,7 +618,8 @@ int _kTxnTypeValues[] = {
   TxnType::READ_ONLY,
   TxnType::COMPACTION,
   TxnType::MATER_VIEW_REBUILD,
-  TxnType::SOFT_DELETE
+  TxnType::SOFT_DELETE,
+  TxnType::REBALANCE_COMPACTION
 };
 const char* _kTxnTypeNames[] = {
   "DEFAULT",
@@ -626,9 +627,10 @@ const char* _kTxnTypeNames[] = {
   "READ_ONLY",
   "COMPACTION",
   "MATER_VIEW_REBUILD",
-  "SOFT_DELETE"
+  "SOFT_DELETE",
+  "REBALANCE_COMPACTION"
 };
-const std::map<int, const char*> _TxnType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kTxnTypeValues, _kTxnTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr));
+const std::map<int, const char*> _TxnType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(7, _kTxnTypeValues, _kTxnTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr));
 
 std::ostream& operator<<(std::ostream& out, const TxnType::type& val) {
   std::map<int, const char*>::const_iterator it = _TxnType_VALUES_TO_NAMES.find(val);
@@ -26845,6 +26847,11 @@ void CompactionRequest::__set_numberOfBuckets(const int32_t val) {
   this->numberOfBuckets = val;
 __isset.numberOfBuckets = true;
 }
+
+void CompactionRequest::__set_orderByClause(const std::string& val) {
+  this->orderByClause = val;
+__isset.orderByClause = true;
+}
 std::ostream& operator<<(std::ostream& out, const CompactionRequest& obj)
 {
   obj.printTo(out);
@@ -26973,6 +26980,14 @@ uint32_t CompactionRequest::read(::apache::thrift::protocol::TProtocol* iprot) {
           xfer += iprot->skip(ftype);
         }
         break;
+      case 11:
+        if (ftype == ::apache::thrift::protocol::T_STRING) {
+          xfer += iprot->readString(this->orderByClause);
+          this->__isset.orderByClause = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
       default:
         xfer += iprot->skip(ftype);
         break;
@@ -27052,6 +27067,11 @@ uint32_t CompactionRequest::write(::apache::thrift::protocol::TProtocol* oprot)
     xfer += oprot->writeI32(this->numberOfBuckets);
     xfer += oprot->writeFieldEnd();
   }
+  if (this->__isset.orderByClause) {
+    xfer += oprot->writeFieldBegin("orderByClause", ::apache::thrift::protocol::T_STRING, 11);
+    xfer += oprot->writeString(this->orderByClause);
+    xfer += oprot->writeFieldEnd();
+  }
   xfer += oprot->writeFieldStop();
   xfer += oprot->writeStructEnd();
   return xfer;
@@ -27069,6 +27089,7 @@ void swap(CompactionRequest &a, CompactionRequest &b) {
   swap(a.initiatorVersion, b.initiatorVersion);
   swap(a.poolName, b.poolName);
   swap(a.numberOfBuckets, b.numberOfBuckets);
+  swap(a.orderByClause, b.orderByClause);
   swap(a.__isset, b.__isset);
 }
 
@@ -27083,6 +27104,7 @@ CompactionRequest::CompactionRequest(const CompactionRequest& other978) {
   initiatorVersion = other978.initiatorVersion;
   poolName = other978.poolName;
   numberOfBuckets = other978.numberOfBuckets;
+  orderByClause = other978.orderByClause;
   __isset = other978.__isset;
 }
 CompactionRequest& CompactionRequest::operator=(const CompactionRequest& other979) {
@@ -27096,6 +27118,7 @@ CompactionRequest& CompactionRequest::operator=(const CompactionRequest& other97
   initiatorVersion = other979.initiatorVersion;
   poolName = other979.poolName;
   numberOfBuckets = other979.numberOfBuckets;
+  orderByClause = other979.orderByClause;
   __isset = other979.__isset;
   return *this;
 }
@@ -27112,6 +27135,7 @@ void CompactionRequest::printTo(std::ostream& out) const {
   out << ", " << "initiatorVersion="; (__isset.initiatorVersion ? (out << to_string(initiatorVersion)) : (out << "<null>"));
   out << ", " << "poolName="; (__isset.poolName ? (out << to_string(poolName)) : (out << "<null>"));
   out << ", " << "numberOfBuckets="; (__isset.numberOfBuckets ? (out << to_string(numberOfBuckets)) : (out << "<null>"));
+  out << ", " << "orderByClause="; (__isset.orderByClause ? (out << to_string(orderByClause)) : (out << "<null>"));
   out << ")";
 }
 
@@ -27205,6 +27229,11 @@ void CompactionInfoStruct::__set_numberOfBuckets(const int32_t val) {
   this->numberOfBuckets = val;
 __isset.numberOfBuckets = true;
 }
+
+void CompactionInfoStruct::__set_orderByClause(const std::string& val) {
+  this->orderByClause = val;
+__isset.orderByClause = true;
+}
 std::ostream& operator<<(std::ostream& out, const CompactionInfoStruct& obj)
 {
   obj.printTo(out);
@@ -27383,6 +27412,14 @@ uint32_t CompactionInfoStruct::read(::apache::thrift::protocol::TProtocol* iprot
           xfer += iprot->skip(ftype);
         }
         break;
+      case 19:
+        if (ftype == ::apache::thrift::protocol::T_STRING) {
+          xfer += iprot->readString(this->orderByClause);
+          this->__isset.orderByClause = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
       default:
         xfer += iprot->skip(ftype);
         break;
@@ -27494,6 +27531,11 @@ uint32_t CompactionInfoStruct::write(::apache::thrift::protocol::TProtocol* opro
     xfer += oprot->writeI32(this->numberOfBuckets);
     xfer += oprot->writeFieldEnd();
   }
+  if (this->__isset.orderByClause) {
+    xfer += oprot->writeFieldBegin("orderByClause", ::apache::thrift::protocol::T_STRING, 19);
+    xfer += oprot->writeString(this->orderByClause);
+    xfer += oprot->writeFieldEnd();
+  }
   xfer += oprot->writeFieldStop();
   xfer += oprot->writeStructEnd();
   return xfer;
@@ -27519,6 +27561,7 @@ void swap(CompactionInfoStruct &a, CompactionInfoStruct &b) {
   swap(a.retryRetention, b.retryRetention);
   swap(a.poolname, b.poolname);
   swap(a.numberOfBuckets, b.numberOfBuckets);
+  swap(a.orderByClause, b.orderByClause);
   swap(a.__isset, b.__isset);
 }
 
@@ -27541,6 +27584,7 @@ CompactionInfoStruct::CompactionInfoStruct(const CompactionInfoStruct& other981)
   retryRetention = other981.retryRetention;
   poolname = other981.poolname;
   numberOfBuckets = other981.numberOfBuckets;
+  orderByClause = other981.orderByClause;
   __isset = other981.__isset;
 }
 CompactionInfoStruct& CompactionInfoStruct::operator=(const CompactionInfoStruct& other982) {
@@ -27562,6 +27606,7 @@ CompactionInfoStruct& CompactionInfoStruct::operator=(const CompactionInfoStruct
   retryRetention = other982.retryRetention;
   poolname = other982.poolname;
   numberOfBuckets = other982.numberOfBuckets;
+  orderByClause = other982.orderByClause;
   __isset = other982.__isset;
   return *this;
 }
@@ -27586,6 +27631,7 @@ void CompactionInfoStruct::printTo(std::ostream& out) const {
   out << ", " << "retryRetention="; (__isset.retryRetention ? (out << to_string(retryRetention)) : (out << "<null>"));
   out << ", " << "poolname="; (__isset.poolname ? (out << to_string(poolname)) : (out << "<null>"));
   out << ", " << "numberOfBuckets="; (__isset.numberOfBuckets ? (out << to_string(numberOfBuckets)) : (out << "<null>"));
+  out << ", " << "orderByClause="; (__isset.orderByClause ? (out << to_string(orderByClause)) : (out << "<null>"));
   out << ")";
 }
 
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index ab8c1c03322..a580adfbe30 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -300,7 +300,8 @@ struct TxnType {
     READ_ONLY = 2,
     COMPACTION = 3,
     MATER_VIEW_REBUILD = 4,
-    SOFT_DELETE = 5
+    SOFT_DELETE = 5,
+    REBALANCE_COMPACTION = 6
   };
 };
 
@@ -10648,7 +10649,7 @@ void swap(HeartbeatTxnRangeResponse &a, HeartbeatTxnRangeResponse &b);
 std::ostream& operator<<(std::ostream& out, const HeartbeatTxnRangeResponse& obj);
 
 typedef struct _CompactionRequest__isset {
-  _CompactionRequest__isset() : partitionname(false), runas(false), properties(false), initiatorId(false), initiatorVersion(false), poolName(false), numberOfBuckets(false) {}
+  _CompactionRequest__isset() : partitionname(false), runas(false), properties(false), initiatorId(false), initiatorVersion(false), poolName(false), numberOfBuckets(false), orderByClause(false) {}
   bool partitionname :1;
   bool runas :1;
   bool properties :1;
@@ -10656,6 +10657,7 @@ typedef struct _CompactionRequest__isset {
   bool initiatorVersion :1;
   bool poolName :1;
   bool numberOfBuckets :1;
+  bool orderByClause :1;
 } _CompactionRequest__isset;
 
 class CompactionRequest : public virtual ::apache::thrift::TBase {
@@ -10672,7 +10674,8 @@ class CompactionRequest : public virtual ::apache::thrift::TBase {
                       initiatorId(),
                       initiatorVersion(),
                       poolName(),
-                      numberOfBuckets(0) {
+                      numberOfBuckets(0),
+                      orderByClause() {
   }
 
   virtual ~CompactionRequest() noexcept;
@@ -10690,6 +10693,7 @@ class CompactionRequest : public virtual ::apache::thrift::TBase {
   std::string initiatorVersion;
   std::string poolName;
   int32_t numberOfBuckets;
+  std::string orderByClause;
 
   _CompactionRequest__isset __isset;
 
@@ -10713,6 +10717,8 @@ class CompactionRequest : public virtual ::apache::thrift::TBase {
 
   void __set_numberOfBuckets(const int32_t val);
 
+  void __set_orderByClause(const std::string& val);
+
   bool operator == (const CompactionRequest & rhs) const
   {
     if (!(dbname == rhs.dbname))
@@ -10749,6 +10755,10 @@ class CompactionRequest : public virtual ::apache::thrift::TBase {
       return false;
     else if (__isset.numberOfBuckets && !(numberOfBuckets == rhs.numberOfBuckets))
       return false;
+    if (__isset.orderByClause != rhs.__isset.orderByClause)
+      return false;
+    else if (__isset.orderByClause && !(orderByClause == rhs.orderByClause))
+      return false;
     return true;
   }
   bool operator != (const CompactionRequest &rhs) const {
@@ -10768,7 +10778,7 @@ void swap(CompactionRequest &a, CompactionRequest &b);
 std::ostream& operator<<(std::ostream& out, const CompactionRequest& obj);
 
 typedef struct _CompactionInfoStruct__isset {
-  _CompactionInfoStruct__isset() : partitionname(false), runas(false), properties(false), toomanyaborts(false), state(false), workerId(false), start(false), highestWriteId(false), errorMessage(false), hasoldabort(false), enqueueTime(false), retryRetention(false), poolname(false), numberOfBuckets(false) {}
+  _CompactionInfoStruct__isset() : partitionname(false), runas(false), properties(false), toomanyaborts(false), state(false), workerId(false), start(false), highestWriteId(false), errorMessage(false), hasoldabort(false), enqueueTime(false), retryRetention(false), poolname(false), numberOfBuckets(false), orderByClause(false) {}
   bool partitionname :1;
   bool runas :1;
   bool properties :1;
@@ -10783,6 +10793,7 @@ typedef struct _CompactionInfoStruct__isset {
   bool retryRetention :1;
   bool poolname :1;
   bool numberOfBuckets :1;
+  bool orderByClause :1;
 } _CompactionInfoStruct__isset;
 
 class CompactionInfoStruct : public virtual ::apache::thrift::TBase {
@@ -10808,7 +10819,8 @@ class CompactionInfoStruct : public virtual ::apache::thrift::TBase {
                          enqueueTime(0),
                          retryRetention(0),
                          poolname(),
-                         numberOfBuckets(0) {
+                         numberOfBuckets(0),
+                         orderByClause() {
   }
 
   virtual ~CompactionInfoStruct() noexcept;
@@ -10834,6 +10846,7 @@ class CompactionInfoStruct : public virtual ::apache::thrift::TBase {
   int64_t retryRetention;
   std::string poolname;
   int32_t numberOfBuckets;
+  std::string orderByClause;
 
   _CompactionInfoStruct__isset __isset;
 
@@ -10873,6 +10886,8 @@ class CompactionInfoStruct : public virtual ::apache::thrift::TBase {
 
   void __set_numberOfBuckets(const int32_t val);
 
+  void __set_orderByClause(const std::string& val);
+
   bool operator == (const CompactionInfoStruct & rhs) const
   {
     if (!(id == rhs.id))
@@ -10939,6 +10954,10 @@ class CompactionInfoStruct : public virtual ::apache::thrift::TBase {
       return false;
     else if (__isset.numberOfBuckets && !(numberOfBuckets == rhs.numberOfBuckets))
       return false;
+    if (__isset.orderByClause != rhs.__isset.orderByClause)
+      return false;
+    else if (__isset.orderByClause && !(orderByClause == rhs.orderByClause))
+      return false;
     return true;
   }
   bool operator != (const CompactionInfoStruct &rhs) const {
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java
index 04865221775..cd6b544580b 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java
@@ -29,6 +29,7 @@ package org.apache.hadoop.hive.metastore.api;
   private static final org.apache.thrift.protocol.TField RETRY_RETENTION_FIELD_DESC = new org.apache.thrift.protocol.TField("retryRetention", org.apache.thrift.protocol.TType.I64, (short)16);
   private static final org.apache.thrift.protocol.TField POOLNAME_FIELD_DESC = new org.apache.thrift.protocol.TField("poolname", org.apache.thrift.protocol.TType.STRING, (short)17);
   private static final org.apache.thrift.protocol.TField NUMBER_OF_BUCKETS_FIELD_DESC = new org.apache.thrift.protocol.TField("numberOfBuckets", org.apache.thrift.protocol.TType.I32, (short)18);
+  private static final org.apache.thrift.protocol.TField ORDER_BY_CLAUSE_FIELD_DESC = new org.apache.thrift.protocol.TField("orderByClause", org.apache.thrift.protocol.TType.STRING, (short)19);
 
   private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new CompactionInfoStructStandardSchemeFactory();
   private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new CompactionInfoStructTupleSchemeFactory();
@@ -51,6 +52,7 @@ package org.apache.hadoop.hive.metastore.api;
   private long retryRetention; // optional
   private @org.apache.thrift.annotation.Nullable java.lang.String poolname; // optional
   private int numberOfBuckets; // optional
+  private @org.apache.thrift.annotation.Nullable java.lang.String orderByClause; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -75,7 +77,8 @@ package org.apache.hadoop.hive.metastore.api;
     ENQUEUE_TIME((short)15, "enqueueTime"),
     RETRY_RETENTION((short)16, "retryRetention"),
     POOLNAME((short)17, "poolname"),
-    NUMBER_OF_BUCKETS((short)18, "numberOfBuckets");
+    NUMBER_OF_BUCKETS((short)18, "numberOfBuckets"),
+    ORDER_BY_CLAUSE((short)19, "orderByClause");
 
     private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
 
@@ -127,6 +130,8 @@ package org.apache.hadoop.hive.metastore.api;
           return POOLNAME;
         case 18: // NUMBER_OF_BUCKETS
           return NUMBER_OF_BUCKETS;
+        case 19: // ORDER_BY_CLAUSE
+          return ORDER_BY_CLAUSE;
         default:
           return null;
       }
@@ -177,7 +182,7 @@ package org.apache.hadoop.hive.metastore.api;
   private static final int __RETRYRETENTION_ISSET_ID = 6;
   private static final int __NUMBEROFBUCKETS_ISSET_ID = 7;
   private byte __isset_bitfield = 0;
-  private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.TOOMANYABORTS,_Fields.STATE,_Fields.WORKER_ID,_Fields.START,_Fields.HIGHEST_WRITE_ID,_Fields.ERROR_MESSAGE,_Fields.HASOLDABORT,_Fields.ENQUEUE_TIME,_Fields.RETRY_RETENTION,_Fields.POOLNAME,_Fields.NUMBER_OF_BUCKETS};
+  private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.TOOMANYABORTS,_Fields.STATE,_Fields.WORKER_ID,_Fields.START,_Fields.HIGHEST_WRITE_ID,_Fields.ERROR_MESSAGE,_Fields.HASOLDABORT,_Fields.ENQUEUE_TIME,_Fields.RETRY_RETENTION,_Fields.POOLNAME,_Fields.NUMBER_OF_BUCKETS,_Fields.ORDER_BY_CLAUSE};
   public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -217,6 +222,8 @@ package org.apache.hadoop.hive.metastore.api;
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.NUMBER_OF_BUCKETS, new org.apache.thrift.meta_data.FieldMetaData("numberOfBuckets", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+    tmpMap.put(_Fields.ORDER_BY_CLAUSE, new org.apache.thrift.meta_data.FieldMetaData("orderByClause", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionInfoStruct.class, metaDataMap);
   }
@@ -281,6 +288,9 @@ package org.apache.hadoop.hive.metastore.api;
       this.poolname = other.poolname;
     }
     this.numberOfBuckets = other.numberOfBuckets;
+    if (other.isSetOrderByClause()) {
+      this.orderByClause = other.orderByClause;
+    }
   }
 
   public CompactionInfoStruct deepCopy() {
@@ -315,6 +325,7 @@ package org.apache.hadoop.hive.metastore.api;
     this.poolname = null;
     setNumberOfBucketsIsSet(false);
     this.numberOfBuckets = 0;
+    this.orderByClause = null;
   }
 
   public long getId() {
@@ -741,6 +752,30 @@ package org.apache.hadoop.hive.metastore.api;
     __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __NUMBEROFBUCKETS_ISSET_ID, value);
   }
 
+  @org.apache.thrift.annotation.Nullable
+  public java.lang.String getOrderByClause() {
+    return this.orderByClause;
+  }
+
+  public void setOrderByClause(@org.apache.thrift.annotation.Nullable java.lang.String orderByClause) {
+    this.orderByClause = orderByClause;
+  }
+
+  public void unsetOrderByClause() {
+    this.orderByClause = null;
+  }
+
+  /** Returns true if field orderByClause is set (has been assigned a value) and false otherwise */
+  public boolean isSetOrderByClause() {
+    return this.orderByClause != null;
+  }
+
+  public void setOrderByClauseIsSet(boolean value) {
+    if (!value) {
+      this.orderByClause = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
     switch (field) {
     case ID:
@@ -887,6 +922,14 @@ package org.apache.hadoop.hive.metastore.api;
       }
       break;
 
+    case ORDER_BY_CLAUSE:
+      if (value == null) {
+        unsetOrderByClause();
+      } else {
+        setOrderByClause((java.lang.String)value);
+      }
+      break;
+
     }
   }
 
@@ -947,6 +990,9 @@ package org.apache.hadoop.hive.metastore.api;
     case NUMBER_OF_BUCKETS:
       return getNumberOfBuckets();
 
+    case ORDER_BY_CLAUSE:
+      return getOrderByClause();
+
     }
     throw new java.lang.IllegalStateException();
   }
@@ -994,6 +1040,8 @@ package org.apache.hadoop.hive.metastore.api;
       return isSetPoolname();
     case NUMBER_OF_BUCKETS:
       return isSetNumberOfBuckets();
+    case ORDER_BY_CLAUSE:
+      return isSetOrderByClause();
     }
     throw new java.lang.IllegalStateException();
   }
@@ -1173,6 +1221,15 @@ package org.apache.hadoop.hive.metastore.api;
         return false;
     }
 
+    boolean this_present_orderByClause = true && this.isSetOrderByClause();
+    boolean that_present_orderByClause = true && that.isSetOrderByClause();
+    if (this_present_orderByClause || that_present_orderByClause) {
+      if (!(this_present_orderByClause && that_present_orderByClause))
+        return false;
+      if (!this.orderByClause.equals(that.orderByClause))
+        return false;
+    }
+
     return true;
   }
 
@@ -1250,6 +1307,10 @@ package org.apache.hadoop.hive.metastore.api;
     if (isSetNumberOfBuckets())
       hashCode = hashCode * 8191 + numberOfBuckets;
 
+    hashCode = hashCode * 8191 + ((isSetOrderByClause()) ? 131071 : 524287);
+    if (isSetOrderByClause())
+      hashCode = hashCode * 8191 + orderByClause.hashCode();
+
     return hashCode;
   }
 
@@ -1441,6 +1502,16 @@ package org.apache.hadoop.hive.metastore.api;
         return lastComparison;
       }
     }
+    lastComparison = java.lang.Boolean.compare(isSetOrderByClause(), other.isSetOrderByClause());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetOrderByClause()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.orderByClause, other.orderByClause);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -1601,6 +1672,16 @@ package org.apache.hadoop.hive.metastore.api;
       sb.append(this.numberOfBuckets);
       first = false;
     }
+    if (isSetOrderByClause()) {
+      if (!first) sb.append(", ");
+      sb.append("orderByClause:");
+      if (this.orderByClause == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.orderByClause);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -1806,6 +1887,14 @@ package org.apache.hadoop.hive.metastore.api;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 19: // ORDER_BY_CLAUSE
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.orderByClause = iprot.readString();
+              struct.setOrderByClauseIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -1921,6 +2010,13 @@ package org.apache.hadoop.hive.metastore.api;
         oprot.writeI32(struct.numberOfBuckets);
         oprot.writeFieldEnd();
       }
+      if (struct.orderByClause != null) {
+        if (struct.isSetOrderByClause()) {
+          oprot.writeFieldBegin(ORDER_BY_CLAUSE_FIELD_DESC);
+          oprot.writeString(struct.orderByClause);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1985,7 +2081,10 @@ package org.apache.hadoop.hive.metastore.api;
       if (struct.isSetNumberOfBuckets()) {
         optionals.set(13);
       }
-      oprot.writeBitSet(optionals, 14);
+      if (struct.isSetOrderByClause()) {
+        optionals.set(14);
+      }
+      oprot.writeBitSet(optionals, 15);
       if (struct.isSetPartitionname()) {
         oprot.writeString(struct.partitionname);
       }
@@ -2028,6 +2127,9 @@ package org.apache.hadoop.hive.metastore.api;
       if (struct.isSetNumberOfBuckets()) {
         oprot.writeI32(struct.numberOfBuckets);
       }
+      if (struct.isSetOrderByClause()) {
+        oprot.writeString(struct.orderByClause);
+      }
     }
 
     @Override
@@ -2041,7 +2143,7 @@ package org.apache.hadoop.hive.metastore.api;
       struct.setTablenameIsSet(true);
       struct.type = org.apache.hadoop.hive.metastore.api.CompactionType.findByValue(iprot.readI32());
       struct.setTypeIsSet(true);
-      java.util.BitSet incoming = iprot.readBitSet(14);
+      java.util.BitSet incoming = iprot.readBitSet(15);
       if (incoming.get(0)) {
         struct.partitionname = iprot.readString();
         struct.setPartitionnameIsSet(true);
@@ -2098,6 +2200,10 @@ package org.apache.hadoop.hive.metastore.api;
         struct.numberOfBuckets = iprot.readI32();
         struct.setNumberOfBucketsIsSet(true);
       }
+      if (incoming.get(14)) {
+        struct.orderByClause = iprot.readString();
+        struct.setOrderByClauseIsSet(true);
+      }
     }
   }
 
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionRequest.java
index fee7d81d95c..07fb642b7c4 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionRequest.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionRequest.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.api;
   private static final org.apache.thrift.protocol.TField INITIATOR_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("initiatorVersion", org.apache.thrift.protocol.TType.STRING, (short)8);
   private static final org.apache.thrift.protocol.TField POOL_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("poolName", org.apache.thrift.protocol.TType.STRING, (short)9);
   private static final org.apache.thrift.protocol.TField NUMBER_OF_BUCKETS_FIELD_DESC = new org.apache.thrift.protocol.TField("numberOfBuckets", org.apache.thrift.protocol.TType.I32, (short)10);
+  private static final org.apache.thrift.protocol.TField ORDER_BY_CLAUSE_FIELD_DESC = new org.apache.thrift.protocol.TField("orderByClause", org.apache.thrift.protocol.TType.STRING, (short)11);
 
   private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new CompactionRequestStandardSchemeFactory();
   private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new CompactionRequestTupleSchemeFactory();
@@ -35,6 +36,7 @@ package org.apache.hadoop.hive.metastore.api;
   private @org.apache.thrift.annotation.Nullable java.lang.String initiatorVersion; // optional
   private @org.apache.thrift.annotation.Nullable java.lang.String poolName; // optional
   private int numberOfBuckets; // optional
+  private @org.apache.thrift.annotation.Nullable java.lang.String orderByClause; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -51,7 +53,8 @@ package org.apache.hadoop.hive.metastore.api;
     INITIATOR_ID((short)7, "initiatorId"),
     INITIATOR_VERSION((short)8, "initiatorVersion"),
     POOL_NAME((short)9, "poolName"),
-    NUMBER_OF_BUCKETS((short)10, "numberOfBuckets");
+    NUMBER_OF_BUCKETS((short)10, "numberOfBuckets"),
+    ORDER_BY_CLAUSE((short)11, "orderByClause");
 
     private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
 
@@ -87,6 +90,8 @@ package org.apache.hadoop.hive.metastore.api;
           return POOL_NAME;
         case 10: // NUMBER_OF_BUCKETS
           return NUMBER_OF_BUCKETS;
+        case 11: // ORDER_BY_CLAUSE
+          return ORDER_BY_CLAUSE;
         default:
           return null;
       }
@@ -130,7 +135,7 @@ package org.apache.hadoop.hive.metastore.api;
   // isset id assignments
   private static final int __NUMBEROFBUCKETS_ISSET_ID = 0;
   private byte __isset_bitfield = 0;
-  private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.INITIATOR_ID,_Fields.INITIATOR_VERSION,_Fields.POOL_NAME,_Fields.NUMBER_OF_BUCKETS};
+  private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.INITIATOR_ID,_Fields.INITIATOR_VERSION,_Fields.POOL_NAME,_Fields.NUMBER_OF_BUCKETS,_Fields.ORDER_BY_CLAUSE};
   public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -156,6 +161,8 @@ package org.apache.hadoop.hive.metastore.api;
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     tmpMap.put(_Fields.NUMBER_OF_BUCKETS, new org.apache.thrift.meta_data.FieldMetaData("numberOfBuckets", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+    tmpMap.put(_Fields.ORDER_BY_CLAUSE, new org.apache.thrift.meta_data.FieldMetaData("orderByClause", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
     metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionRequest.class, metaDataMap);
   }
@@ -208,6 +215,9 @@ package org.apache.hadoop.hive.metastore.api;
       this.poolName = other.poolName;
     }
     this.numberOfBuckets = other.numberOfBuckets;
+    if (other.isSetOrderByClause()) {
+      this.orderByClause = other.orderByClause;
+    }
   }
 
   public CompactionRequest deepCopy() {
@@ -227,6 +237,7 @@ package org.apache.hadoop.hive.metastore.api;
     this.poolName = null;
     setNumberOfBucketsIsSet(false);
     this.numberOfBuckets = 0;
+    this.orderByClause = null;
   }
 
   @org.apache.thrift.annotation.Nullable
@@ -486,6 +497,30 @@ package org.apache.hadoop.hive.metastore.api;
     __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __NUMBEROFBUCKETS_ISSET_ID, value);
   }
 
+  @org.apache.thrift.annotation.Nullable
+  public java.lang.String getOrderByClause() {
+    return this.orderByClause;
+  }
+
+  public void setOrderByClause(@org.apache.thrift.annotation.Nullable java.lang.String orderByClause) {
+    this.orderByClause = orderByClause;
+  }
+
+  public void unsetOrderByClause() {
+    this.orderByClause = null;
+  }
+
+  /** Returns true if field orderByClause is set (has been assigned a value) and false otherwise */
+  public boolean isSetOrderByClause() {
+    return this.orderByClause != null;
+  }
+
+  public void setOrderByClauseIsSet(boolean value) {
+    if (!value) {
+      this.orderByClause = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
     switch (field) {
     case DBNAME:
@@ -568,6 +603,14 @@ package org.apache.hadoop.hive.metastore.api;
       }
       break;
 
+    case ORDER_BY_CLAUSE:
+      if (value == null) {
+        unsetOrderByClause();
+      } else {
+        setOrderByClause((java.lang.String)value);
+      }
+      break;
+
     }
   }
 
@@ -604,6 +647,9 @@ package org.apache.hadoop.hive.metastore.api;
     case NUMBER_OF_BUCKETS:
       return getNumberOfBuckets();
 
+    case ORDER_BY_CLAUSE:
+      return getOrderByClause();
+
     }
     throw new java.lang.IllegalStateException();
   }
@@ -635,6 +681,8 @@ package org.apache.hadoop.hive.metastore.api;
       return isSetPoolName();
     case NUMBER_OF_BUCKETS:
       return isSetNumberOfBuckets();
+    case ORDER_BY_CLAUSE:
+      return isSetOrderByClause();
     }
     throw new java.lang.IllegalStateException();
   }
@@ -742,6 +790,15 @@ package org.apache.hadoop.hive.metastore.api;
         return false;
     }
 
+    boolean this_present_orderByClause = true && this.isSetOrderByClause();
+    boolean that_present_orderByClause = true && that.isSetOrderByClause();
+    if (this_present_orderByClause || that_present_orderByClause) {
+      if (!(this_present_orderByClause && that_present_orderByClause))
+        return false;
+      if (!this.orderByClause.equals(that.orderByClause))
+        return false;
+    }
+
     return true;
   }
 
@@ -789,6 +846,10 @@ package org.apache.hadoop.hive.metastore.api;
     if (isSetNumberOfBuckets())
       hashCode = hashCode * 8191 + numberOfBuckets;
 
+    hashCode = hashCode * 8191 + ((isSetOrderByClause()) ? 131071 : 524287);
+    if (isSetOrderByClause())
+      hashCode = hashCode * 8191 + orderByClause.hashCode();
+
     return hashCode;
   }
 
@@ -900,6 +961,16 @@ package org.apache.hadoop.hive.metastore.api;
         return lastComparison;
       }
     }
+    lastComparison = java.lang.Boolean.compare(isSetOrderByClause(), other.isSetOrderByClause());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetOrderByClause()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.orderByClause, other.orderByClause);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -1010,6 +1081,16 @@ package org.apache.hadoop.hive.metastore.api;
       sb.append(this.numberOfBuckets);
       first = false;
     }
+    if (isSetOrderByClause()) {
+      if (!first) sb.append(", ");
+      sb.append("orderByClause:");
+      if (this.orderByClause == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.orderByClause);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -1159,6 +1240,14 @@ package org.apache.hadoop.hive.metastore.api;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 11: // ORDER_BY_CLAUSE
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.orderByClause = iprot.readString();
+              struct.setOrderByClauseIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -1242,6 +1331,13 @@ package org.apache.hadoop.hive.metastore.api;
         oprot.writeI32(struct.numberOfBuckets);
         oprot.writeFieldEnd();
       }
+      if (struct.orderByClause != null) {
+        if (struct.isSetOrderByClause()) {
+          oprot.writeFieldBegin(ORDER_BY_CLAUSE_FIELD_DESC);
+          oprot.writeString(struct.orderByClause);
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1284,7 +1380,10 @@ package org.apache.hadoop.hive.metastore.api;
       if (struct.isSetNumberOfBuckets()) {
         optionals.set(6);
       }
-      oprot.writeBitSet(optionals, 7);
+      if (struct.isSetOrderByClause()) {
+        optionals.set(7);
+      }
+      oprot.writeBitSet(optionals, 8);
       if (struct.isSetPartitionname()) {
         oprot.writeString(struct.partitionname);
       }
@@ -1313,6 +1412,9 @@ package org.apache.hadoop.hive.metastore.api;
       if (struct.isSetNumberOfBuckets()) {
         oprot.writeI32(struct.numberOfBuckets);
       }
+      if (struct.isSetOrderByClause()) {
+        oprot.writeString(struct.orderByClause);
+      }
     }
 
     @Override
@@ -1324,7 +1426,7 @@ package org.apache.hadoop.hive.metastore.api;
       struct.setTablenameIsSet(true);
       struct.type = org.apache.hadoop.hive.metastore.api.CompactionType.findByValue(iprot.readI32());
       struct.setTypeIsSet(true);
-      java.util.BitSet incoming = iprot.readBitSet(7);
+      java.util.BitSet incoming = iprot.readBitSet(8);
       if (incoming.get(0)) {
         struct.partitionname = iprot.readString();
         struct.setPartitionnameIsSet(true);
@@ -1364,6 +1466,10 @@ package org.apache.hadoop.hive.metastore.api;
         struct.numberOfBuckets = iprot.readI32();
         struct.setNumberOfBucketsIsSet(true);
       }
+      if (incoming.get(7)) {
+        struct.orderByClause = iprot.readString();
+        struct.setOrderByClauseIsSet(true);
+      }
     }
   }
 
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java
index a785fd4fb4e..8b2aed925c3 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java
@@ -14,7 +14,8 @@ public enum TxnType implements org.apache.thrift.TEnum {
   READ_ONLY(2),
   COMPACTION(3),
   MATER_VIEW_REBUILD(4),
-  SOFT_DELETE(5);
+  SOFT_DELETE(5),
+  REBALANCE_COMPACTION(6);
 
   private final int value;
 
@@ -48,6 +49,8 @@ public enum TxnType implements org.apache.thrift.TEnum {
         return MATER_VIEW_REBUILD;
       case 5:
         return SOFT_DELETE;
+      case 6:
+        return REBALANCE_COMPACTION;
       default:
         return null;
     }
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionInfoStruct.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionInfoStruct.php
index efbf2543bb4..338e1cb962b 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionInfoStruct.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionInfoStruct.php
@@ -112,6 +112,11 @@ class CompactionInfoStruct
             'isRequired' => false,
             'type' => TType::I32,
         ),
+        19 => array(
+            'var' => 'orderByClause',
+            'isRequired' => false,
+            'type' => TType::STRING,
+        ),
     );
 
     /**
@@ -186,6 +191,10 @@ class CompactionInfoStruct
      * @var int
      */
     public $numberOfBuckets = null;
+    /**
+     * @var string
+     */
+    public $orderByClause = null;
 
     public function __construct($vals = null)
     {
@@ -244,6 +253,9 @@ class CompactionInfoStruct
             if (isset($vals['numberOfBuckets'])) {
                 $this->numberOfBuckets = $vals['numberOfBuckets'];
             }
+            if (isset($vals['orderByClause'])) {
+                $this->orderByClause = $vals['orderByClause'];
+            }
         }
     }
 
@@ -392,6 +404,13 @@ class CompactionInfoStruct
                         $xfer += $input->skip($ftype);
                     }
                     break;
+                case 19:
+                    if ($ftype == TType::STRING) {
+                        $xfer += $input->readString($this->orderByClause);
+                    } else {
+                        $xfer += $input->skip($ftype);
+                    }
+                    break;
                 default:
                     $xfer += $input->skip($ftype);
                     break;
@@ -496,6 +515,11 @@ class CompactionInfoStruct
             $xfer += $output->writeI32($this->numberOfBuckets);
             $xfer += $output->writeFieldEnd();
         }
+        if ($this->orderByClause !== null) {
+            $xfer += $output->writeFieldBegin('orderByClause', TType::STRING, 19);
+            $xfer += $output->writeString($this->orderByClause);
+            $xfer += $output->writeFieldEnd();
+        }
         $xfer += $output->writeFieldStop();
         $xfer += $output->writeStructEnd();
         return $xfer;
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionRequest.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionRequest.php
index 8672354e151..a087f829c3b 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionRequest.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionRequest.php
@@ -80,6 +80,11 @@ class CompactionRequest
             'isRequired' => false,
             'type' => TType::I32,
         ),
+        11 => array(
+            'var' => 'orderByClause',
+            'isRequired' => false,
+            'type' => TType::STRING,
+        ),
     );
 
     /**
@@ -122,6 +127,10 @@ class CompactionRequest
      * @var int
      */
     public $numberOfBuckets = null;
+    /**
+     * @var string
+     */
+    public $orderByClause = null;
 
     public function __construct($vals = null)
     {
@@ -156,6 +165,9 @@ class CompactionRequest
             if (isset($vals['numberOfBuckets'])) {
                 $this->numberOfBuckets = $vals['numberOfBuckets'];
             }
+            if (isset($vals['orderByClause'])) {
+                $this->orderByClause = $vals['orderByClause'];
+            }
         }
     }
 
@@ -260,6 +272,13 @@ class CompactionRequest
                         $xfer += $input->skip($ftype);
                     }
                     break;
+                case 11:
+                    if ($ftype == TType::STRING) {
+                        $xfer += $input->readString($this->orderByClause);
+                    } else {
+                        $xfer += $input->skip($ftype);
+                    }
+                    break;
                 default:
                     $xfer += $input->skip($ftype);
                     break;
@@ -332,6 +351,11 @@ class CompactionRequest
             $xfer += $output->writeI32($this->numberOfBuckets);
             $xfer += $output->writeFieldEnd();
         }
+        if ($this->orderByClause !== null) {
+            $xfer += $output->writeFieldBegin('orderByClause', TType::STRING, 11);
+            $xfer += $output->writeString($this->orderByClause);
+            $xfer += $output->writeFieldEnd();
+        }
         $xfer += $output->writeFieldStop();
         $xfer += $output->writeStructEnd();
         return $xfer;
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php
index d9200832e4a..6183e577df2 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php
@@ -30,6 +30,8 @@ final class TxnType
 
     const SOFT_DELETE = 5;
 
+    const REBALANCE_COMPACTION = 6;
+
     static public $__names = array(
         0 => 'DEFAULT',
         1 => 'REPL_CREATED',
@@ -37,6 +39,7 @@ final class TxnType
         3 => 'COMPACTION',
         4 => 'MATER_VIEW_REBUILD',
         5 => 'SOFT_DELETE',
+        6 => 'REBALANCE_COMPACTION',
     );
 }
 
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 6f66497ca19..e95e0285f6c 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -381,6 +381,7 @@ class TxnType(object):
     COMPACTION = 3
     MATER_VIEW_REBUILD = 4
     SOFT_DELETE = 5
+    REBALANCE_COMPACTION = 6
 
     _VALUES_TO_NAMES = {
         0: "DEFAULT",
@@ -389,6 +390,7 @@ class TxnType(object):
         3: "COMPACTION",
         4: "MATER_VIEW_REBUILD",
         5: "SOFT_DELETE",
+        6: "REBALANCE_COMPACTION",
     }
 
     _NAMES_TO_VALUES = {
@@ -398,6 +400,7 @@ class TxnType(object):
         "COMPACTION": 3,
         "MATER_VIEW_REBUILD": 4,
         "SOFT_DELETE": 5,
+        "REBALANCE_COMPACTION": 6,
     }
 
 
@@ -15355,11 +15358,12 @@ class CompactionRequest(object):
      - initiatorVersion
      - poolName
      - numberOfBuckets
+     - orderByClause
 
     """
 
 
-    def __init__(self, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, initiatorId=None, initiatorVersion=None, poolName=None, numberOfBuckets=None,):
+    def __init__(self, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, initiatorId=None, initiatorVersion=None, poolName=None, numberOfBuckets=None, orderByClause=None,):
         self.dbname = dbname
         self.tablename = tablename
         self.partitionname = partitionname
@@ -15370,6 +15374,7 @@ class CompactionRequest(object):
         self.initiatorVersion = initiatorVersion
         self.poolName = poolName
         self.numberOfBuckets = numberOfBuckets
+        self.orderByClause = orderByClause
 
     def read(self, iprot):
         if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -15436,6 +15441,11 @@ class CompactionRequest(object):
                     self.numberOfBuckets = iprot.readI32()
                 else:
                     iprot.skip(ftype)
+            elif fid == 11:
+                if ftype == TType.STRING:
+                    self.orderByClause = iprot.readString().decode('utf-8', errors='replace') if sys.version_info[0] == 2 else iprot.readString()
+                else:
+                    iprot.skip(ftype)
             else:
                 iprot.skip(ftype)
             iprot.readFieldEnd()
@@ -15490,6 +15500,10 @@ class CompactionRequest(object):
             oprot.writeFieldBegin('numberOfBuckets', TType.I32, 10)
             oprot.writeI32(self.numberOfBuckets)
             oprot.writeFieldEnd()
+        if self.orderByClause is not None:
+            oprot.writeFieldBegin('orderByClause', TType.STRING, 11)
+            oprot.writeString(self.orderByClause.encode('utf-8') if sys.version_info[0] == 2 else self.orderByClause)
+            oprot.writeFieldEnd()
         oprot.writeFieldStop()
         oprot.writeStructEnd()
 
@@ -15535,11 +15549,12 @@ class CompactionInfoStruct(object):
      - retryRetention
      - poolname
      - numberOfBuckets
+     - orderByClause
 
     """
 
 
-    def __init__(self, id=None, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, toomanyaborts=None, state=None, workerId=None, start=None, highestWriteId=None, errorMessage=None, hasoldabort=None, enqueueTime=None, retryRetention=None, poolname=None, numberOfBuckets=None,):
+    def __init__(self, id=None, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, toomanyaborts=None, state=None, workerId=None, start=None, highestWriteId=None, errorMessage=None, hasoldabort=None, enqueueTime=None, retryRetention=None, poolname=None, numberOfBuckets=None, orderByClause=None,):
         self.id = id
         self.dbname = dbname
         self.tablename = tablename
@@ -15558,6 +15573,7 @@ class CompactionInfoStruct(object):
         self.retryRetention = retryRetention
         self.poolname = poolname
         self.numberOfBuckets = numberOfBuckets
+        self.orderByClause = orderByClause
 
     def read(self, iprot):
         if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -15658,6 +15674,11 @@ class CompactionInfoStruct(object):
                     self.numberOfBuckets = iprot.readI32()
                 else:
                     iprot.skip(ftype)
+            elif fid == 19:
+                if ftype == TType.STRING:
+                    self.orderByClause = iprot.readString().decode('utf-8', errors='replace') if sys.version_info[0] == 2 else iprot.readString()
+                else:
+                    iprot.skip(ftype)
             else:
                 iprot.skip(ftype)
             iprot.readFieldEnd()
@@ -15740,6 +15761,10 @@ class CompactionInfoStruct(object):
             oprot.writeFieldBegin('numberOfBuckets', TType.I32, 18)
             oprot.writeI32(self.numberOfBuckets)
             oprot.writeFieldEnd()
+        if self.orderByClause is not None:
+            oprot.writeFieldBegin('orderByClause', TType.STRING, 19)
+            oprot.writeString(self.orderByClause.encode('utf-8') if sys.version_info[0] == 2 else self.orderByClause)
+            oprot.writeFieldEnd()
         oprot.writeFieldStop()
         oprot.writeStructEnd()
 
@@ -31667,6 +31692,7 @@ CompactionRequest.thrift_spec = (
     (8, TType.STRING, 'initiatorVersion', 'UTF8', None, ),  # 8
     (9, TType.STRING, 'poolName', 'UTF8', None, ),  # 9
     (10, TType.I32, 'numberOfBuckets', None, None, ),  # 10
+    (11, TType.STRING, 'orderByClause', 'UTF8', None, ),  # 11
 )
 all_structs.append(CompactionInfoStruct)
 CompactionInfoStruct.thrift_spec = (
@@ -31689,6 +31715,7 @@ CompactionInfoStruct.thrift_spec = (
     (16, TType.I64, 'retryRetention', None, None, ),  # 16
     (17, TType.STRING, 'poolname', 'UTF8', None, ),  # 17
     (18, TType.I32, 'numberOfBuckets', None, None, ),  # 18
+    (19, TType.STRING, 'orderByClause', 'UTF8', None, ),  # 19
 )
 all_structs.append(OptionalCompactionInfoStruct)
 OptionalCompactionInfoStruct.thrift_spec = (
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 98ba4f54dd5..b65733171e5 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -172,8 +172,9 @@ module TxnType
   COMPACTION = 3
   MATER_VIEW_REBUILD = 4
   SOFT_DELETE = 5
-  VALUE_MAP = {0 => "DEFAULT", 1 => "REPL_CREATED", 2 => "READ_ONLY", 3 => "COMPACTION", 4 => "MATER_VIEW_REBUILD", 5 => "SOFT_DELETE"}
-  VALID_VALUES = Set.new([DEFAULT, REPL_CREATED, READ_ONLY, COMPACTION, MATER_VIEW_REBUILD, SOFT_DELETE]).freeze
+  REBALANCE_COMPACTION = 6
+  VALUE_MAP = {0 => "DEFAULT", 1 => "REPL_CREATED", 2 => "READ_ONLY", 3 => "COMPACTION", 4 => "MATER_VIEW_REBUILD", 5 => "SOFT_DELETE", 6 => "REBALANCE_COMPACTION"}
+  VALID_VALUES = Set.new([DEFAULT, REPL_CREATED, READ_ONLY, COMPACTION, MATER_VIEW_REBUILD, SOFT_DELETE, REBALANCE_COMPACTION]).freeze
 end
 
 module GetTablesExtRequestFields
@@ -4481,6 +4482,7 @@ class CompactionRequest
   INITIATORVERSION = 8
   POOLNAME = 9
   NUMBEROFBUCKETS = 10
+  ORDERBYCLAUSE = 11
 
   FIELDS = {
     DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'},
@@ -4492,7 +4494,8 @@ class CompactionRequest
     INITIATORID => {:type => ::Thrift::Types::STRING, :name => 'initiatorId', :optional => true},
     INITIATORVERSION => {:type => ::Thrift::Types::STRING, :name => 'initiatorVersion', :optional => true},
     POOLNAME => {:type => ::Thrift::Types::STRING, :name => 'poolName', :optional => true},
-    NUMBEROFBUCKETS => {:type => ::Thrift::Types::I32, :name => 'numberOfBuckets', :optional => true}
+    NUMBEROFBUCKETS => {:type => ::Thrift::Types::I32, :name => 'numberOfBuckets', :optional => true},
+    ORDERBYCLAUSE => {:type => ::Thrift::Types::STRING, :name => 'orderByClause', :optional => true}
   }
 
   def struct_fields; FIELDS; end
@@ -4529,6 +4532,7 @@ class CompactionInfoStruct
   RETRYRETENTION = 16
   POOLNAME = 17
   NUMBEROFBUCKETS = 18
+  ORDERBYCLAUSE = 19
 
   FIELDS = {
     ID => {:type => ::Thrift::Types::I64, :name => 'id'},
@@ -4548,7 +4552,8 @@ class CompactionInfoStruct
     ENQUEUETIME => {:type => ::Thrift::Types::I64, :name => 'enqueueTime', :optional => true},
     RETRYRETENTION => {:type => ::Thrift::Types::I64, :name => 'retryRetention', :optional => true},
     POOLNAME => {:type => ::Thrift::Types::STRING, :name => 'poolname', :optional => true},
-    NUMBEROFBUCKETS => {:type => ::Thrift::Types::I32, :name => 'numberOfBuckets', :optional => true}
+    NUMBEROFBUCKETS => {:type => ::Thrift::Types::I32, :name => 'numberOfBuckets', :optional => true},
+    ORDERBYCLAUSE => {:type => ::Thrift::Types::STRING, :name => 'orderByClause', :optional => true}
   }
 
   def struct_fields; FIELDS; end
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java
index 8d3f060e8d0..f3f0e5d939b 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java
@@ -61,7 +61,7 @@ public class TxnQueries {
                     "   \"CQ_WORKER_VERSION\" AS \"CC_WORKER_VERSION\", \"CQ_INITIATOR_ID\" AS \"CC_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\" AS \"CC_INITIATOR_VERSION\", " +
                     "   \"CQ_RETRY_RETENTION\" AS \"CC_RETRY_RETENTION\", \"CQ_NEXT_TXN_ID\" AS \"CC_NEXT_TXN_ID\", \"CQ_TXN_ID\" AS \"CC_TXN_ID\", " +
                     "   \"CQ_COMMIT_TIME\" AS \"CC_COMMIT_TIME\", \"CQ_POOL_NAME\" AS \"CC_POOL_NAME\",  " +
-                    "   \"CQ_NUMBER_OF_BUCKETS\" AS \"CC_NUMBER_OF_BUCKETS\" " +
+                    "   \"CQ_NUMBER_OF_BUCKETS\" AS \"CC_NUMBER_OF_BUCKETS\", \"CQ_ORDER_BY\" AS \"CC_ORDER_BY\" " +
                     "   FROM " +
                     "   \"COMPACTION_QUEUE\" " +
                     "   UNION ALL " +
@@ -71,7 +71,7 @@ public class TxnQueries {
                     "   \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", \"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\", " +
                     "   \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", " +
                     "    -1 , \"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_NEXT_TXN_ID\", \"CC_POOL_NAME\", " +
-                    "   \"CC_NUMBER_OF_BUCKETS\" " +
+                    "   \"CC_NUMBER_OF_BUCKETS\", \"CC_ORDER_BY\" " +
                     "   FROM   " +
                     "   \"COMPLETED_COMPACTIONS\") XX ";
 
@@ -82,6 +82,7 @@ public class TxnQueries {
                     "   \"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", \"CC_START\", \"CC_END\", \"CC_RUN_AS\", " +
                     "   \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", \"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\", " +
                     "   \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\"," +
-                    "   \"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", \"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\") " +
-                    "   VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+                    "   \"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", \"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\", " +
+                    "   \"CC_ORDER_BY\") " +
+                    "   VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
 }
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index af20b469d48..bca23c8111c 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1005,7 +1005,8 @@ enum TxnType {
     READ_ONLY    = 2,
     COMPACTION   = 3,
     MATER_VIEW_REBUILD = 4,
-    SOFT_DELETE  = 5
+    SOFT_DELETE  = 5,
+    REBALANCE_COMPACTION = 6
 }
 
 // specifies which info to return with GetTablesExtRequest
@@ -1294,6 +1295,7 @@ struct CompactionRequest {
     8: optional string initiatorVersion
     9: optional string poolName
     10: optional i32 numberOfBuckets
+    11: optional string orderByClause;
 }
 
 struct CompactionInfoStruct {
@@ -1315,6 +1317,7 @@ struct CompactionInfoStruct {
     16: optional i64 retryRetention,
     17: optional string poolname
     18: optional i32 numberOfBuckets
+    19: optional string orderByClause;
 }
 
 struct OptionalCompactionInfoStruct {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index 46e88a3d6b0..5e5db301d63 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.metastore.txn;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
 import org.apache.hadoop.hive.metastore.api.CompactionInfoStruct;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -65,6 +67,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
   public long commitTime = 0;
   public String poolName;
   public int numberOfBuckets = 0;
+  public String orderByClause;
 
   /**
    * The highest write id that the compaction job will pay attention to.
@@ -148,31 +151,35 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
   public int compareTo(CompactionInfo o) {
     return getFullPartitionName().compareTo(o.getFullPartitionName());
   }
+
   public String toString() {
-    return "id:" + id + "," +
-      "dbname:" + dbname + "," +
-      "tableName:" + tableName + "," +
-      "partName:" + partName + "," +
-      "state:" + state + "," +
-      "type:" + type + "," +
-      "enqueueTime:" + enqueueTime + "," +
-      "commitTime:" + commitTime + "," +
-      "start:" + start + "," +
-      "properties:" + properties + "," +
-      "runAs:" + runAs + "," +
-      "tooManyAborts:" + tooManyAborts + "," +
-      "hasOldAbort:" + hasOldAbort + "," +
-      "highestWriteId:" + highestWriteId + "," +
-      "errorMessage:" + errorMessage + "," +
-      "workerId: " + workerId + "," +
-      "workerVersion: " + workerVersion + "," +
-      "initiatorId: " + initiatorId + "," +
-      "initiatorVersion: " + initiatorVersion + "," +
-      "retryRetention" + retryRetention + "," +
-      "txnId" + txnId + "," +
-      "nextTxnId" + nextTxnId + "," +
-      "poolname" + poolName + "," +
-      "numberOfBuckets" + numberOfBuckets;
+    return new ToStringBuilder(this)
+        .append("id", id)
+        .append("dbname", dbname)
+        .append("tableName", tableName)
+        .append("partName", partName)
+        .append("state", state)
+        .append("type", type)
+        .append("enqueueTime", enqueueTime)
+        .append("commitTime", commitTime)
+        .append("start", start)
+        .append("properties", properties)
+        .append("runAs", runAs)
+        .append("tooManyAborts", tooManyAborts)
+        .append("hasOldAbort", hasOldAbort)
+        .append("highestWriteId", highestWriteId)
+        .append("errorMessage", errorMessage)
+        .append("workerId", workerId)
+        .append("workerVersion", workerVersion)
+        .append("initiatorId", initiatorId)
+        .append("initiatorVersion", initiatorVersion)
+        .append("retryRetention", retryRetention)
+        .append("txnId", txnId)
+        .append("nextTxnId", nextTxnId)
+        .append("poolName", poolName)
+        .append("numberOfBuckets", numberOfBuckets)
+        .append("orderByClause", orderByClause)
+        .build();
   }
 
   @Override
@@ -225,6 +232,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
     fullCi.commitTime = rs.getLong(22);
     fullCi.poolName = rs.getString(23);
     fullCi.numberOfBuckets = rs.getInt(24);
+    fullCi.orderByClause = rs.getString(25);
     return fullCi;
   }
   static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException, MetaException {
@@ -252,6 +260,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
     pStmt.setLong(22, ci.commitTime);
     pStmt.setString(23, ci.poolName);
     pStmt.setInt(24, ci.numberOfBuckets);
+    pStmt.setString(25, ci.orderByClause);
   }
 
   public static CompactionInfo compactionStructToInfo(CompactionInfoStruct cr) {
@@ -295,6 +304,9 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
     if (cr.isSetNumberOfBuckets()) {
       ci.numberOfBuckets = cr.getNumberOfBuckets();
     }
+    if (cr.isSetOrderByClause()) {
+      ci.orderByClause = cr.getOrderByClause();
+    }
     return ci;
   }
 
@@ -317,6 +329,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
     cr.setRetryRetention(ci.retryRetention);
     cr.setPoolname(ci.poolName);
     cr.setNumberOfBuckets(ci.numberOfBuckets);
+    cr.setOrderByClause(ci.orderByClause);
     return cr;
   }
 
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 927833f8d48..12851547693 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -67,7 +67,7 @@ class CompactionTxnHandler extends TxnHandler {
           + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", "
           + "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", "
           + "\"CQ_RETRY_RETENTION\", \"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", "
-          + "\"CQ_NUMBER_OF_BUCKETS\" "
+          + "\"CQ_NUMBER_OF_BUCKETS\", \"CQ_ORDER_BY\" "
           + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_TXN_ID\" = ?";
   private static final String SELECT_COMPACTION_METRICS_CACHE_QUERY =
       "SELECT \"CMC_METRIC_VALUE\", \"CMC_VERSION\" FROM \"COMPACTION_METRICS_CACHE\" " +
@@ -247,8 +247,8 @@ class CompactionTxnHandler extends TxnHandler {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction);
         StringBuilder sb = new StringBuilder();
         sb.append("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " +
-          "\"CQ_TYPE\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", \"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" " +
-          "WHERE \"CQ_STATE\" = '" + INITIATED_STATE + "' AND ");
+          "\"CQ_TYPE\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", \"CQ_ORDER_BY\", " +
+            "\"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + INITIATED_STATE + "' AND ");
         boolean hasPoolName = StringUtils.isNotBlank(rqst.getPoolName());
         if(hasPoolName) {
           sb.append("\"CQ_POOL_NAME\"=?");
@@ -279,7 +279,8 @@ class CompactionTxnHandler extends TxnHandler {
           info.type = TxnUtils.dbCompactionType2ThriftType(rs.getString(5).charAt(0));
           info.poolName = rs.getString(6);
           info.numberOfBuckets = rs.getInt(7);
-          info.properties = rs.getString(8);
+          info.orderByClause = rs.getString(8);
+          info.properties = rs.getString(9);
           info.workerId = rqst.getWorkerId();
 
           String workerId = rqst.getWorkerId();
@@ -577,13 +578,15 @@ class CompactionTxnHandler extends TxnHandler {
             + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", "
             + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\", \"CC_ENQUEUE_TIME\", "
             + "\"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", "
-            + "\"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", \"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\") "
+            + "\"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", \"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\","
+            + "\"CC_ORDER_BY\") "
           + "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "
             + quoteChar(SUCCEEDED_STATE) + ", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", "
             + getEpochFn(dbProduct) + ", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", "
             + "\"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", \"CQ_ENQUEUE_TIME\", "
             + "\"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", "
-            + "\"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\" "
+            + "\"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", "
+            + "\"CQ_ORDER_BY\" "
             + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";
         pStmt = dbConn.prepareStatement(s);
         pStmt.setLong(1, info.id);
@@ -1404,7 +1407,7 @@ class CompactionTxnHandler extends TxnHandler {
                 + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", "
                 + "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", "
                 + "\"CQ_RETRY_RETENTION\", \"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", "
-                + "\"CQ_NUMBER_OF_BUCKETS\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?");
+                + "\"CQ_NUMBER_OF_BUCKETS\", \"CQ_ORDER_BY\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?");
         pStmt.setLong(1, ci.id);
         rs = pStmt.executeQuery();
         if (rs.next()) {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index ab1db0f8201..bf071327434 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1567,7 +1567,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           assert true;
         }
 
-        if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn) {
+        if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn && !MetaStoreServerUtils.isCompactionTxn(txnType)) {
           moveTxnComponentsToCompleted(stmt, txnid, isUpdateDelete);
         } else if (isReplayedReplTxn) {
           if (rqst.isSetWriteEventInfos()) {
@@ -3767,6 +3767,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             if (rqst.isSetNumberOfBuckets()) {
               buf.append(", \"CQ_NUMBER_OF_BUCKETS\"");
             }
+            if (rqst.isSetOrderByClause()) {
+              buf.append(", \"CQ_ORDER_BY\"");
+            }
             if (rqst.getProperties() != null) {
               buf.append(", \"CQ_TBLPROPERTIES\"");
             }
@@ -3802,6 +3805,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             if (rqst.isSetNumberOfBuckets()) {
               buf.append(", ").append(rqst.getNumberOfBuckets());
             }
+            if (rqst.isSetOrderByClause()) {
+              buf.append(", ?");
+              params.add(rqst.getOrderByClause());
+            }
             if (rqst.getProperties() != null) {
               buf.append(", ?");
               params.add(new StringableMap(rqst.getProperties()).toString());
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
index d487752f1a3..5c417893590 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator;
 import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregatorFactory;
@@ -1617,4 +1618,8 @@ public class MetaStoreServerUtils {
     }
     return null;
   }
+
+  public static boolean isCompactionTxn(TxnType txnType) {
+    return TxnType.COMPACTION.equals(txnType) || TxnType.REBALANCE_COMPACTION.equals(txnType);
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index cfac2d4a717..f68b1400ba9 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -636,7 +636,8 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_CLEANER_START bigint,
   CQ_RETRY_RETENTION bigint NOT NULL DEFAULT 0,
   CQ_POOL_NAME varchar(128),
-  CQ_NUMBER_OF_BUCKETS integer
+  CQ_NUMBER_OF_BUCKETS integer,
+  CQ_ORDER_BY varchar(4000)
 );
 
 CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
@@ -668,7 +669,8 @@ CREATE TABLE COMPLETED_COMPACTIONS (
   CC_INITIATOR_VERSION varchar(128),
   CC_WORKER_VERSION varchar(128),
   CC_POOL_NAME varchar(128),
-  CC_NUMBER_OF_BUCKETS integer
+  CC_NUMBER_OF_BUCKETS integer,
+  CC_ORDER_BY varchar(4000)
 );
 
 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-4.0.0-alpha-2-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-4.0.0-alpha-2-to-4.0.0.derby.sql
index 8e11eefec12..498988f7fab 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-4.0.0-alpha-2-to-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-4.0.0-alpha-2-to-4.0.0.derby.sql
@@ -6,7 +6,11 @@ ALTER TABLE "APP"."PART_COL_STATS" ADD HISTOGRAM BLOB;
 
 -- HIVE-26719
 ALTER TABLE COMPACTION_QUEUE ADD CQ_NUMBER_OF_BUCKETS INTEGER;
-ALTER TABLE COMPLETED_COMPACTIONS ADD CQ_NUMBER_OF_BUCKETS INTEGER;
+ALTER TABLE COMPLETED_COMPACTIONS ADD CC_NUMBER_OF_BUCKETS INTEGER;
+
+-- HIVE-26735
+ALTER TABLE COMPACTION_QUEUE ADD CQ_ORDER_BY VARCHAR(4000);
+ALTER TABLE COMPLETED_COMPACTIONS ADD CC_ORDER_BY VARCHAR(4000);
 
 -- HIVE-26704
 CREATE TABLE MIN_HISTORY_WRITE_ID (
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index a3270b35ad2..426a5b961ba 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1056,6 +1056,7 @@ CREATE TABLE COMPACTION_QUEUE(
 	CQ_RETRY_RETENTION bigint NOT NULL DEFAULT 0,
     CQ_POOL_NAME nvarchar(128) NULL,
     CQ_NUMBER_OF_BUCKETS integer,
+    CQ_ORDER_BY varchar(4000),
 PRIMARY KEY CLUSTERED
 (
 	CQ_ID ASC
@@ -1087,6 +1088,7 @@ CREATE TABLE COMPLETED_COMPACTIONS (
     CC_WORKER_VERSION nvarchar(128) NULL,
     CC_POOL_NAME nvarchar(128) NULL,
     CC_NUMBER_OF_BUCKETS integer,
+    CC_ORDER_BY varchar(4000),
 PRIMARY KEY CLUSTERED
 (
 	CC_ID ASC
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-4.0.0-alpha-2-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-4.0.0-alpha-2-to-4.0.0.mssql.sql
index f3a611cc2e9..85c9a85c846 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-4.0.0-alpha-2-to-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-4.0.0-alpha-2-to-4.0.0.mssql.sql
@@ -6,7 +6,11 @@ ALTER TABLE PART_COL_STATS ADD HISTOGRAM varbinary(max);
 
 -- HIVE-26719
 ALTER TABLE COMPACTION_QUEUE ADD CQ_NUMBER_OF_BUCKETS INTEGER NULL;
-ALTER TABLE COMPLETED_COMPACTIONS ADD CQ_NUMBER_OF_BUCKETS INTEGER NULL;
+ALTER TABLE COMPLETED_COMPACTIONS ADD CC_NUMBER_OF_BUCKETS INTEGER NULL;
+
+-- HIVE-26735
+ALTER TABLE COMPACTION_QUEUE ADD CQ_ORDER_BY VARCHAR(4000);
+ALTER TABLE COMPLETED_COMPACTIONS ADD CC_ORDER_BY VARCHAR(4000);
 
 -- HIVE-26704
 CREATE TABLE MIN_HISTORY_WRITE_ID (
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 89ae964f9aa..5983f9cd2a3 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -1097,7 +1097,8 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_CLEANER_START bigint,
   CQ_RETRY_RETENTION bigint NOT NULL DEFAULT 0,
   CQ_POOL_NAME varchar(128),
-  CQ_NUMBER_OF_BUCKETS integer
+  CQ_NUMBER_OF_BUCKETS integer,
+  CQ_ORDER_BY varchar(4000)
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
 CREATE TABLE COMPLETED_COMPACTIONS (
@@ -1124,7 +1125,8 @@ CREATE TABLE COMPLETED_COMPACTIONS (
   CC_INITIATOR_VERSION varchar(128),
   CC_WORKER_VERSION varchar(128),
   CC_POOL_NAME varchar(128),
-  CC_NUMBER_OF_BUCKETS integer
+  CC_NUMBER_OF_BUCKETS integer,
+  CC_ORDER_BY varchar(4000)
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-4.0.0-alpha-2-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-4.0.0-alpha-2-to-4.0.0.mysql.sql
index 6f2c3ca2002..db6ee535290 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-4.0.0-alpha-2-to-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-4.0.0-alpha-2-to-4.0.0.mysql.sql
@@ -6,7 +6,11 @@ ALTER TABLE PART_COL_STATS ADD HISTOGRAM blob;
 
 -- HIVE-26719
 ALTER TABLE `COMPACTION_QUEUE` ADD COLUMN `CQ_NUMBER_OF_BUCKETS` INTEGER;
-ALTER TABLE `COMPLETED_COMPACTIONS` ADD COLUMN `CQ_NUMBER_OF_BUCKETS` INTEGER;
+ALTER TABLE `COMPLETED_COMPACTIONS` ADD COLUMN `CC_NUMBER_OF_BUCKETS` INTEGER;
+
+-- HIVE-26735
+ALTER TABLE `COMPACTION_QUEUE` ADD COLUMN `CQ_ORDER_BY` VARCHAR(4000);
+ALTER TABLE `COMPLETED_COMPACTIONS` ADD COLUMN `CC_ORDER_BY` VARCHAR(4000);
 
 -- HIVE-26704
 CREATE TABLE MIN_HISTORY_WRITE_ID (
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index bee4717c959..d31431538df 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -1099,7 +1099,8 @@ CREATE TABLE COMPACTION_QUEUE (
   CQ_CLEANER_START NUMBER(19),
   CQ_RETRY_RETENTION NUMBER(19) DEFAULT 0 NOT NULL,
   CQ_POOL_NAME varchar(128),
-  CQ_NUMBER_OF_BUCKETS integer
+  CQ_NUMBER_OF_BUCKETS integer,
+  CQ_ORDER_BY varchar(4000)
 ) ROWDEPENDENCIES;
 
 CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
@@ -1131,7 +1132,8 @@ CREATE TABLE COMPLETED_COMPACTIONS (
   CC_INITIATOR_VERSION varchar(128),
   CC_WORKER_VERSION varchar(128),
   CC_POOL_NAME varchar(128),
-  CC_NUMBER_OF_BUCKETS integer
+  CC_NUMBER_OF_BUCKETS integer,
+  CC_ORDER_BY varchar(4000)
 ) ROWDEPENDENCIES;
 
 CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION);
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-4.0.0-alpha-2-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-4.0.0-alpha-2-to-4.0.0.oracle.sql
index 90c96a56b73..3c57e9f912a 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-4.0.0-alpha-2-to-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-4.0.0-alpha-2-to-4.0.0.oracle.sql
@@ -6,7 +6,11 @@ ALTER TABLE PART_COL_STATS ADD HISTOGRAM BLOB;
 
 -- HIVE-26719
 ALTER TABLE COMPACTION_QUEUE ADD CQ_NUMBER_OF_BUCKETS INTEGER;
-ALTER TABLE COMPLETED_COMPACTIONS ADD CQ_NUMBER_OF_BUCKETS INTEGER;
+ALTER TABLE COMPLETED_COMPACTIONS ADD CC_NUMBER_OF_BUCKETS INTEGER;
+
+-- HIVE-26735
+ALTER TABLE COMPACTION_QUEUE ADD CQ_ORDER_BY VARCHAR(4000);
+ALTER TABLE COMPLETED_COMPACTIONS ADD CC_ORDER_BY VARCHAR(4000);
 
 -- HIVE-26704
 CREATE TABLE MIN_HISTORY_WRITE_ID (
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index d52060f3418..502125222da 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1814,7 +1814,8 @@ CREATE TABLE "COMPACTION_QUEUE" (
   "CQ_CLEANER_START" bigint,
   "CQ_RETRY_RETENTION" bigint not null default 0,
   "CQ_POOL_NAME" varchar(128),
-  "CQ_NUMBER_OF_BUCKETS" integer
+  "CQ_NUMBER_OF_BUCKETS" integer,
+  "CQ_ORDER_BY" varchar(4000)
 );
 
 CREATE TABLE "NEXT_COMPACTION_QUEUE_ID" (
@@ -1846,7 +1847,8 @@ CREATE TABLE "COMPLETED_COMPACTIONS" (
   "CC_INITIATOR_VERSION" varchar(128),
   "CC_WORKER_VERSION" varchar(128),
   "CC_POOL_NAME" varchar(128),
-  "CC_NUMBER_OF_BUCKETS" integer
+  "CC_NUMBER_OF_BUCKETS" integer,
+  "CC_ORDER_BY" varchar(4000)
 );
 
 CREATE INDEX "COMPLETED_COMPACTIONS_RES" ON "COMPLETED_COMPACTIONS" ("CC_DATABASE","CC_TABLE","CC_PARTITION");
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-4.0.0-alpha-2-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-4.0.0-alpha-2-to-4.0.0.postgres.sql
index ff384c681b9..870b5bb4b26 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-4.0.0-alpha-2-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-4.0.0-alpha-2-to-4.0.0.postgres.sql
@@ -6,7 +6,11 @@ ALTER TABLE "PART_COL_STATS" ADD "HISTOGRAM" bytea;
 
 -- HIVE-26719
 ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_NUMBER_OF_BUCKETS" INTEGER;
-ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CQ_NUMBER_OF_BUCKETS" INTEGER;
+ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CC_NUMBER_OF_BUCKETS" INTEGER;
+
+-- HIVE-26735
+ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_ORDER_BY" VARCHAR(4000);
+ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CC_ORDER_BY" VARCHAR(4000);
 
 -- HIVE-26704
 CREATE TABLE "MIN_HISTORY_WRITE_ID" (
diff --git a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
index 5cf7fadd3a4..b70634aefb3 100644
--- a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
@@ -145,7 +145,11 @@ ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CC_POOL_NAME" VARCHAR(128);
 
 -- HIVE-26719
 ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_NUMBER_OF_BUCKETS" INTEGER;
-ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CQ_NUMBER_OF_BUCKETS" INTEGER;
+ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CC_NUMBER_OF_BUCKETS" INTEGER;
+
+-- HIVE-26735
+ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_ORDER_BY" VARCHAR(4000);
+ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CC_ORDER_BY" VARCHAR(4000);
 
 -- HIVE-26704
 CREATE TABLE "MIN_HISTORY_WRITE_ID" (