You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ku...@apache.org on 2020/08/04 10:36:01 UTC

[hive] branch master updated: HIVE-23763: Query based minor compaction produces wrong files when ro… (#1327)

This is an automated email from the ASF dual-hosted git repository.

kuczoram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 98c925c  HIVE-23763: Query based minor compaction produces wrong files when ro… (#1327)
98c925c is described below

commit 98c925c8cb52e410c9aa02a499d2ac15a6d6777b
Author: kuczoram <ku...@cloudera.com>
AuthorDate: Tue Aug 4 12:35:52 2020 +0200

    HIVE-23763: Query based minor compaction produces wrong files when ro… (#1327)
    
    * HIVE-23763: Query based minor compaction produces wrong files when rows with different buckets Ids are processed by the same FileSinkOperator
    
    * HIVE-23763: Fix the MM compaction tests
    
    * HIVE-23763: Address the review comments
---
 .../hive/ql/txn/compactor/CompactorOnTezTest.java  | 150 ++++++++++-
 .../hive/ql/txn/compactor/CompactorTestUtil.java   |  13 +-
 .../ql/txn/compactor/TestCrudCompactorOnTez.java   | 290 ++++++++++++++++++++-
 .../ql/txn/compactor/TestMmCompactorOnTez.java     |  18 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java      |  64 +++--
 .../hadoop/hive/ql/exec/ReduceSinkOperator.java    |  14 +
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   |  21 +-
 .../opconventer/HiveOpConverterUtils.java          |   2 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     |  25 +-
 .../org/apache/hadoop/hive/ql/plan/PlanUtils.java  |   3 +-
 .../apache/hadoop/hive/ql/plan/ReduceSinkDesc.java |   9 +
 .../hive/ql/txn/compactor/QueryCompactor.java      |  12 +
 12 files changed, 551 insertions(+), 70 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index 71232de..08b9039 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -35,6 +35,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtil.executeStatementOnDriverAndReturnResults;
 import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
@@ -53,6 +55,7 @@ public class CompactorOnTezTest {
   protected IMetaStoreClient msClient;
   protected IDriver driver;
   protected boolean runsOnTez = true;
+  protected boolean mmCompaction = false;
 
   @Before
   // Note: we create a new conf and driver object before every test
@@ -95,6 +98,17 @@ public class CompactorOnTezTest {
     conf.set("hive.tez.container.size", "128");
     conf.setBoolean("hive.merge.tezfiles", false);
     conf.setBoolean("hive.in.tez.test", true);
+    if (!mmCompaction) {
+      // We need these settings to create a table which is not bucketed, but contains multiple files.
+      // If these parameters are set when inserting 100 rows into the table, the rows will
+      // be distributed into multiple files. This setup is used in the testMinorCompactionWithoutBuckets,
+      // testMinorCompactionWithoutBucketsInsertOverwrite and testMajorCompactionWithoutBucketsInsertAndDeleteInsertOverwrite
+      // tests in the TestCrudCompactorOnTez class.
+      // This use case has to be tested because of HIVE-23763. The MM compaction is not affected by this issue,
+      // therefore no need to set these configs for MM compaction.
+      conf.set("tez.grouping.max-size", "1024");
+      conf.set("tez.grouping.min-size", "1");
+    }
   }
 
   @After public void tearDown() {
@@ -218,6 +232,77 @@ public class CompactorOnTezTest {
     }
 
     /**
+     * This method is for creating a non-bucketed table in which the data is distributed
+     * into multiple splits. The initial data is 100 rows and it should be split into
+     * multiple files, like bucket_000001, bucket_000002, ...
+     * This is needed because the MINOR compactions had issues with tables like this. (HIVE-23763)
+     * @param dbName
+     * @param tblName
+     * @param tempTblName
+     * @param createDeletes
+     * @param createInserts
+     * @param insertOverwrite
+     * @throws Exception
+     */
+    void createTableWithoutBucketWithMultipleSplits(String dbName, String tblName, String tempTblName,
+        boolean createDeletes, boolean createInserts, boolean insertOverwrite) throws Exception {
+      if (dbName != null) {
+        tblName = dbName + "." + tblName;
+        tempTblName = dbName + "." + tempTblName;
+      }
+
+      executeStatementOnDriver("drop table if exists " + tblName, driver);
+      StringBuilder query = new StringBuilder();
+      query.append("create table ").append(tblName).append(" (a string, b string, c string)");
+      query.append(" stored as orc");
+      query.append(" TBLPROPERTIES('transactional'='true',");
+      query.append(" 'transactional_properties'='default')");
+      executeStatementOnDriver(query.toString(), driver);
+
+      generateInsertsWithMultipleSplits(0, 100, tblName, tempTblName + "_1", insertOverwrite);
+
+      if (createDeletes) {
+        executeStatementOnDriver("delete from " + tblName + " where a in ('41','87','53','11')", driver);
+        executeStatementOnDriver("delete from " + tblName + " where a in ('42','88','81','12','86')", driver);
+        executeStatementOnDriver("delete from " + tblName + " where a in ('98')", driver);
+        executeStatementOnDriver("delete from " + tblName + " where a in ('40')", driver);
+      }
+
+      if (createInserts) {
+        generateInsertsWithMultipleSplits(100, 250, tblName, tempTblName + "_2", false);
+        generateInsertsWithMultipleSplits(300, 318, tblName, tempTblName + "_3", false);
+        generateInsertsWithMultipleSplits(400, 410, tblName, tempTblName + "_4", false);
+      }
+    }
+
+    private void generateInsertsWithMultipleSplits(int begin, int end, String tableName, String tempTableName,
+        boolean insertOverwrite) throws Exception {
+      StringBuffer sb = new StringBuffer();
+      for (int i = begin; i < end; i++) {
+        sb.append("('");
+        sb.append(i);
+        sb.append("','value");
+        sb.append(i);
+        sb.append("','this is some comment to increase the file size ");
+        sb.append(i);
+        sb.append("')");
+        if (i < end - 1) {
+          sb.append(",");
+        }
+      }
+      executeStatementOnDriver("DROP TABLE IF EXISTS " + tempTableName, driver);
+      executeStatementOnDriver(
+          "CREATE EXTERNAL TABLE " + tempTableName + " (id string, value string, comment string) STORED AS TEXTFILE ",
+          driver);
+      executeStatementOnDriver("insert into " + tempTableName + " values " + sb.toString(), driver);
+      if (insertOverwrite) {
+        executeStatementOnDriver("insert overwrite table " + tableName + " select * from " + tempTableName, driver);
+      } else {
+        executeStatementOnDriver("insert into " + tableName + " select * from " + tempTableName, driver);
+      }
+    }
+
+    /**
      * 5 txns.
      */
     void insertMmTestData(String tblName) throws Exception {
@@ -261,22 +346,77 @@ public class CompactorOnTezTest {
     }
 
     List<String> getAllData(String tblName) throws Exception {
-      return getAllData(null, tblName);
+      return getAllData(null, tblName, false);
+    }
+
+    List<String> getAllData(String tblName, boolean withRowId) throws Exception {
+      return getAllData(null, tblName, withRowId);
     }
 
-    List<String> getAllData(String dbName, String tblName) throws Exception {
+    List<String> getAllData(String dbName, String tblName, boolean withRowId) throws Exception {
       if (dbName != null) {
         tblName = dbName + "." + tblName;
       }
-      List<String> result = executeStatementOnDriverAndReturnResults("select * from " + tblName, driver);
+      StringBuffer query = new StringBuffer();
+      query.append("select ");
+      if (withRowId) {
+        query.append("ROW__ID, ");
+      }
+      query.append("* from ");
+      query.append(tblName);
+      List<String> result = executeStatementOnDriverAndReturnResults(query.toString(), driver);
       Collections.sort(result);
       return result;
     }
 
+    List<String> getDataWithInputFileNames(String dbName, String tblName) throws Exception {
+      if (dbName != null) {
+        tblName = dbName + "." + tblName;
+      }
+      StringBuffer query = new StringBuffer();
+      query.append("select ");
+      query.append("INPUT__FILE__NAME, a from ");
+      query.append(tblName);
+      query.append(" order by a");
+      List<String> result = executeStatementOnDriverAndReturnResults(query.toString(), driver);
+      return result;
+    }
+
+    boolean compareFileNames(List<String> expectedFileNames, List<String> actualFileNames) {
+      if (expectedFileNames.size() != actualFileNames.size()) {
+        return false;
+      }
+
+      Pattern p = Pattern.compile("(.*)(bucket_[0-9]+)(_[0-9]+)?");
+      for (int i = 0; i < expectedFileNames.size(); i++) {
+        String[] expectedParts = expectedFileNames.get(i).split("\t");
+        String[] actualParts = actualFileNames.get(i).split("\t");
+
+        if (!expectedParts[1].equals(actualParts[1])) {
+          return false;
+        }
+
+        String expectedFileName = null;
+        String actualFileName = null;
+        Matcher m = p.matcher(expectedParts[0]);
+        if (m.matches()) {
+          expectedFileName = m.group(2);
+        }
+        m = p.matcher(actualParts[0]);
+        if (m.matches()) {
+          actualFileName = m.group(2);
+        }
+
+        if (expectedFileName == null || actualFileName == null || !expectedFileName.equals(actualFileName)) {
+          return false;
+        }
+      }
+      return true;
+    }
+
     protected List<String> getBucketData(String tblName, String bucketId) throws Exception {
       return executeStatementOnDriverAndReturnResults(
-          "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order"
-              + " by a, b", driver);
+          "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order by ROW__ID, a, b", driver);
     }
 
     protected void dropTable(String tblName) throws Exception {
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
index 3ca5d4c..9db2229 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
@@ -95,8 +95,16 @@ class CompactorTestUtil {
       throws IOException {
     Path path = partitionName == null ? new Path(table.getSd().getLocation(), deltaName) : new Path(
         new Path(table.getSd().getLocation()), new Path(partitionName, deltaName));
-    return Arrays.stream(fs.listStatus(path, AcidUtils.hiddenFileFilter)).map(FileStatus::getPath).map(Path::getName)
-        .sorted().collect(Collectors.toList());
+    return Arrays.stream(fs.listStatus(path, AcidUtils.bucketFileFilter)).map(FileStatus::getPath).map(Path::getName).sorted()
+        .collect(Collectors.toList());
+  }
+
+  static List<String> getBucketFileNamesForMMTables(FileSystem fs, Table table, String partitionName, String deltaName)
+      throws IOException {
+    Path path = partitionName == null ? new Path(table.getSd().getLocation(), deltaName) : new Path(
+        new Path(table.getSd().getLocation()), new Path(partitionName, deltaName));
+    return Arrays.stream(fs.listStatus(path, AcidUtils.hiddenFileFilter)).map(FileStatus::getPath).map(Path::getName).sorted()
+        .collect(Collectors.toList());
   }
 
   /**
@@ -160,6 +168,7 @@ class CompactorTestUtil {
       throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + e);
     }
     List<String> rs = new ArrayList<>();
+    driver.setMaxRows(400);
     driver.getResults(rs);
     return rs;
   }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index 242e1cb..bba5278 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -21,7 +21,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileSystem;
@@ -260,8 +264,8 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
         "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t4\t3\ttomorrow",
         "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t4\t4\ttoday",
         "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2\tyesterday",
-        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday",
         "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t4\ttoday",
+        "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday",
         "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t6\t2\ttoday",
         "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t6\t3\ttoday",
         "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t4\ttoday"));
@@ -351,8 +355,8 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
         "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t3\ttomorrow",
         "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4\ttoday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t2\tyesterday",
-        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t4\ttoday",
+        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t6\t2\ttoday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":2}\t6\t3\ttoday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":3}\t6\t4\ttoday");
@@ -498,6 +502,261 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
   }
 
   @Test
+  public void testMinorCompactionWithoutBuckets() throws Exception {
+    String dbName = "default";
+    String tableName = "testMinorCompaction_wobuckets_1";
+    String tempTableName = "tmp_txt_table_1";
+
+    List<String> expectedDeltas = new ArrayList<>();
+    expectedDeltas.add("delta_0000001_0000001_0000");
+    expectedDeltas.add("delta_0000006_0000006_0000");
+    expectedDeltas.add("delta_0000007_0000007_0000");
+    expectedDeltas.add("delta_0000008_0000008_0000");
+
+    List<String> expectedDeleteDeltas = new ArrayList<>();
+    expectedDeleteDeltas.add("delete_delta_0000002_0000002_0000");
+    expectedDeleteDeltas.add("delete_delta_0000003_0000003_0000");
+    expectedDeleteDeltas.add("delete_delta_0000004_0000004_0000");
+    expectedDeleteDeltas.add("delete_delta_0000005_0000005_0000");
+
+    testMinorCompactionWithoutBucketsCommon(dbName, tableName, tempTableName, false, expectedDeltas,
+        expectedDeleteDeltas, "delta_0000001_0000008_v0000025", CompactionType.MINOR);
+  }
+
+  @Test
+  public void testMinorCompactionWithoutBucketsInsertOverwrite() throws Exception {
+    String dbName = "default";
+    String tableName = "testMinorCompaction_wobuckets_2";
+    String tempTableName = "tmp_txt_table_2";
+
+    List<String> expectedDeltas = new ArrayList<>();
+    expectedDeltas.add("delta_0000006_0000006_0000");
+    expectedDeltas.add("delta_0000007_0000007_0000");
+    expectedDeltas.add("delta_0000008_0000008_0000");
+
+    List<String> expectedDeleteDeltas = new ArrayList<>();
+    expectedDeleteDeltas.add("delete_delta_0000002_0000002_0000");
+    expectedDeleteDeltas.add("delete_delta_0000003_0000003_0000");
+    expectedDeleteDeltas.add("delete_delta_0000004_0000004_0000");
+    expectedDeleteDeltas.add("delete_delta_0000005_0000005_0000");
+
+    testMinorCompactionWithoutBucketsCommon(dbName, tableName, tempTableName, true, expectedDeltas,
+        expectedDeleteDeltas, "delta_0000001_0000008_v0000025", CompactionType.MINOR);
+  }
+
+  @Test
+  public void testMajorCompactionWithoutBucketsInsertAndDeleteInsertOverwrite() throws Exception {
+    String dbName = "default";
+    String tableName = "testMinorCompaction_wobuckets_3";
+    String tempTableName = "tmp_txt_table_3";
+
+    List<String> expectedDeltas = new ArrayList<>();
+    expectedDeltas.add("delta_0000006_0000006_0000");
+    expectedDeltas.add("delta_0000007_0000007_0000");
+    expectedDeltas.add("delta_0000008_0000008_0000");
+
+    List<String> expectedDeleteDeltas = new ArrayList<>();
+    expectedDeleteDeltas.add("delete_delta_0000002_0000002_0000");
+    expectedDeleteDeltas.add("delete_delta_0000003_0000003_0000");
+    expectedDeleteDeltas.add("delete_delta_0000004_0000004_0000");
+    expectedDeleteDeltas.add("delete_delta_0000005_0000005_0000");
+
+    testMinorCompactionWithoutBucketsCommon(dbName, tableName, tempTableName, true, expectedDeltas,
+        expectedDeleteDeltas, "base_0000008_v0000025", CompactionType.MAJOR);
+  }
+
+  private void testMinorCompactionWithoutBucketsCommon(String dbName, String tableName, String tempTableName,
+      boolean insertOverWrite, List<String> expectedDeltas, List<String> expectedDeleteDeltas,
+      String expectedCompactedDeltaDirName, CompactionType compactionType) throws Exception {
+
+    TestDataProvider dataProvider = new TestDataProvider();
+    dataProvider.createTableWithoutBucketWithMultipleSplits(dbName, tableName, tempTableName, true, true,
+        insertOverWrite);
+
+    FileSystem fs = FileSystem.get(conf);
+    Table table = msClient.getTable(dbName, tableName);
+
+    List<String> expectedData = dataProvider.getAllData(tableName);
+    List<String> expectedFileNames = dataProvider.getDataWithInputFileNames(null, tableName);
+
+    // Verify deltas
+    Assert.assertEquals("Delta directories does not match", expectedDeltas,
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+    // Verify delete delta
+    Assert.assertEquals("Delete directories does not match", expectedDeleteDeltas,
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null));
+
+    Set<String> expectedDeleteBucketFilesSet = new HashSet<>();
+    for (String expectedDeleteDelta : expectedDeleteDeltas) {
+      expectedDeleteBucketFilesSet.addAll(CompactorTestUtil.getBucketFileNames(fs, table, null, expectedDeleteDelta));
+    }
+    List<String> expectedDeleteBucketFiles = new ArrayList<>(expectedDeleteBucketFilesSet);
+    Collections.sort(expectedDeleteBucketFiles);
+
+    Set<String> expectedBucketFilesSet = new HashSet<>();
+    for (String expectedDelta : expectedDeltas) {
+      expectedBucketFilesSet.addAll(CompactorTestUtil.getBucketFileNames(fs, table, null, expectedDelta));
+    }
+    List<String> expectedBucketFiles = new ArrayList<>();
+    for (String expectedBucketFile : expectedBucketFilesSet) {
+      Pattern p = Pattern.compile("(bucket_[0-9]+)(_[0-9]+)?");
+      Matcher m = p.matcher(expectedBucketFile);
+      if (m.matches()) {
+        expectedBucketFiles.add(m.group(1));
+      }
+    }
+    Collections.sort(expectedBucketFiles);
+
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, compactionType, true);
+    // Clean up resources
+    CompactorTestUtil.runCleaner(conf);
+
+    // Only 1 compaction should be in the response queue with succeeded state
+    List<ShowCompactResponseElement> compacts =
+        TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+    Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size());
+    Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState());
+
+    // Verify delta and delete delta directories after compaction
+    if (CompactionType.MAJOR == compactionType) {
+      List<String> actualBasesAfterComp =
+          CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null);
+      Assert.assertEquals("Base directory does not match after compaction",
+          Collections.singletonList(expectedCompactedDeltaDirName), actualBasesAfterComp);
+      // Verify bucket files in delta and delete delta dirs
+      Assert.assertEquals("Bucket names are not matching after compaction in the base folder",
+          expectedBucketFiles, CompactorTestUtil.getBucketFileNames(fs, table, null, actualBasesAfterComp.get(0)));
+    } else {
+      List<String> actualDeltasAfterComp =
+          CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+      Assert.assertEquals("Delta directories does not match after compaction",
+          Collections.singletonList(expectedCompactedDeltaDirName), actualDeltasAfterComp);
+      List<String> actualDeleteDeltasAfterComp =
+          CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null);
+      Assert.assertEquals("Delete delta directories does not match after compaction",
+          Collections.singletonList("delete_" + expectedCompactedDeltaDirName), actualDeleteDeltasAfterComp);
+      // Verify bucket files in delta and delete delta dirs
+      Assert.assertEquals("Bucket names are not matching after compaction in the delete deltas",
+          expectedDeleteBucketFiles,
+          CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeleteDeltasAfterComp.get(0)));
+      Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+          CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+    }
+
+    // Verify all contents
+    List<String> actualData = dataProvider.getAllData(tableName);
+    Assert.assertEquals(expectedData, actualData);
+    List<String> actualFileNames = dataProvider.getDataWithInputFileNames(null, tableName);
+    Assert.assertTrue(dataProvider.compareFileNames(expectedFileNames, actualFileNames));
+    dataProvider.dropTable(tableName);
+  }
+
+  @Test
+  public void testMinorAndMajorCompactionWithoutBuckets() throws Exception {
+    String dbName = "default";
+    String tableName = "testMinorCompaction_wobuckets_5";
+    String tempTableName = "tmp_txt_table_5";
+
+    TestDataProvider dataProvider = new TestDataProvider();
+    dataProvider.createTableWithoutBucketWithMultipleSplits(dbName, tableName, tempTableName, true, true, false);
+
+    FileSystem fs = FileSystem.get(conf);
+    Table table = msClient.getTable(dbName, tableName);
+
+    List<String> expectedData = dataProvider.getAllData(tableName);
+    // Verify deltas
+    List<String> expectedDeltas = new ArrayList<>();
+    expectedDeltas.add("delta_0000001_0000001_0000");
+    expectedDeltas.add("delta_0000006_0000006_0000");
+    expectedDeltas.add("delta_0000007_0000007_0000");
+    expectedDeltas.add("delta_0000008_0000008_0000");
+    Assert.assertEquals("Delta directories does not match",
+        expectedDeltas,
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+    // Verify delete delta
+    List<String> expectedDeleteDeltas = new ArrayList<>();
+    expectedDeleteDeltas.add("delete_delta_0000002_0000002_0000");
+    expectedDeleteDeltas.add("delete_delta_0000003_0000003_0000");
+    expectedDeleteDeltas.add("delete_delta_0000004_0000004_0000");
+    expectedDeleteDeltas.add("delete_delta_0000005_0000005_0000");
+    Assert.assertEquals("Delete directories does not match",
+        expectedDeleteDeltas,
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null));
+
+    Set<String> expectedDeleteBucketFilesSet = new HashSet<>();
+    for (String expectedDeleteDelta : expectedDeleteDeltas) {
+      expectedDeleteBucketFilesSet.addAll(CompactorTestUtil.getBucketFileNames(fs, table, null, expectedDeleteDelta));
+    }
+    List<String> expectedDeleteBucketFiles = new ArrayList<>(expectedDeleteBucketFilesSet);
+    Collections.sort(expectedDeleteBucketFiles);
+
+    Set<String> expectedBucketFilesSet = new HashSet<>();
+    for (String expectedDelta : expectedDeltas) {
+      expectedBucketFilesSet.addAll(CompactorTestUtil.getBucketFileNames(fs, table, null, expectedDelta));
+    }
+    List<String> expectedBucketFiles = new ArrayList<>();
+    for (String expectedBucketFile : expectedBucketFilesSet) {
+      Pattern p = Pattern.compile("(bucket_[0-9]+)(_[0-9]+)?");
+      Matcher m = p.matcher(expectedBucketFile);
+      if (m.matches()) {
+        expectedBucketFiles.add(m.group(1));
+      }
+    }
+    Collections.sort(expectedBucketFiles);
+
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+    CompactorTestUtil.runCleaner(conf);
+
+    // Only 1 compaction should be in the response queue with succeeded state
+    List<ShowCompactResponseElement> compacts =
+        TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+    Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size());
+    Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState());
+    // Verify delta directories after compaction
+    List<String> actualDeltasAfterComp =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+    Assert.assertEquals("Delta directories does not match after compaction",
+        Collections.singletonList("delta_0000001_0000008_v0000024"), actualDeltasAfterComp);
+    List<String> actualDeleteDeltasAfterComp =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null);
+    Assert.assertEquals("Delete delta directories does not match after compaction",
+        Collections.singletonList("delete_delta_0000001_0000008_v0000024"), actualDeleteDeltasAfterComp);
+    // Verify bucket files in delta dirs
+    List<String> actualData = dataProvider.getAllData(tableName);
+
+    Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+        CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+
+    Assert.assertEquals("Bucket names are not matching after compaction", expectedDeleteBucketFiles,
+        CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeleteDeltasAfterComp.get(0)));
+    // Verify all contents
+   // List<String> actualData = dataProvider.getAllData(tableName);
+    Assert.assertEquals(expectedData, actualData);
+
+    CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true);
+    // Clean up resources
+    CompactorTestUtil.runCleaner(conf);
+
+    // Only 1 compaction should be in the response queue with succeeded state
+    compacts =
+        TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+    Assert.assertEquals("Completed compaction queue must contain one element", 2, compacts.size());
+    Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState());
+    // Verify delta directories after compaction
+    List<String> actualBasesAfterComp =
+        CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null);
+    Assert.assertEquals("Base directory does not match after compaction",
+        Collections.singletonList("base_0000008_v0000038"), actualBasesAfterComp);
+    // Verify bucket files in delta dirs
+    Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+        CompactorTestUtil.getBucketFileNames(fs, table, null, actualBasesAfterComp.get(0)));
+    // Verify all contents
+    actualData = dataProvider.getAllData(tableName);
+    Assert.assertEquals(expectedData, actualData);
+    dataProvider.dropTable(tableName);
+  }
+
+  @Test
   public void testMinorCompactionNotPartitionedWithBuckets() throws Exception {
     Assume.assumeTrue(runsOnTez);
     String dbName = "default";
@@ -630,6 +889,24 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
         CompactorTestUtil
             .getBucketFileNames(fs, table, partitionToday, actualDeleteDeltasAfterCompPartToday.get(0)));
 
+    // Verify contents of bucket files.
+    // Bucket 0
+    List<String> expectedRsBucket0 = Arrays
+        .asList("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3\tyesterday",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4\ttoday",
+            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday",
+            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4\tyesterday",
+            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t4\t3\ttomorrow",
+            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t4\t4\ttoday",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2\tyesterday",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t4\ttoday",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t6\t2\ttoday",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t6\t3\ttoday",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t4\ttoday");
+    List<String> rsBucket0 = dataProvider.getBucketData(tableName, "536870912");
+    Assert.assertEquals(expectedRsBucket0, rsBucket0);
+
     // Verify all contents
     List<String> actualData = dataProvider.getAllData(tableName);
     Assert.assertEquals(expectedData, actualData);
@@ -704,14 +981,13 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     List<String> rsBucket0 = dataProvider.getBucketData(tableName, "536870912");
     Assert.assertEquals(expectedRsBucket0, rsBucket0);
     // Bucket 1
-    List<String> expectedRsBucket1 = Arrays.asList(
-        "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t3\tyesterday",
+    List<String> expectedRsBucket1 = Arrays.asList("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t3\tyesterday",
         "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4\ttoday",
         "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t3\ttomorrow",
         "{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4\ttoday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t2\tyesterday",
-        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t4\ttoday",
+        "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t6\t2\ttoday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":2}\t6\t3\ttoday",
         "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":3}\t6\t4\ttoday");
@@ -1250,7 +1526,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     // Insert test data into test table
     dataProvider.insertTestData(dbName, tableName);
     // Get all data before compaction is run
-    List<String> expectedData = dataProvider.getAllData(dbName, tableName);
+    List<String> expectedData = dataProvider.getAllData(dbName, tableName, false);
     Collections.sort(expectedData);
     // Run a compaction
     CompactorTestUtil.runCompaction(conf, dbName, tableName, compactionType, true);
@@ -1263,7 +1539,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
             + " compaction", Collections.singletonList(resultDirName),
         CompactorTestUtil.getBaseOrDeltaNames(fs, pathFilter, table, null));
     // Verify all contents
-    List<String> actualData = dataProvider.getAllData(dbName, tableName);
+    List<String> actualData = dataProvider.getAllData(dbName, tableName, false);
     Assert.assertEquals(expectedData, actualData);
   }
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
index 9772bbe..dd9e06c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
@@ -49,6 +49,10 @@ import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeState
  */
 public class TestMmCompactorOnTez extends CompactorOnTezTest {
 
+  public TestMmCompactorOnTez() {
+    mmCompaction = true;
+  }
+
   @Test public void testMmMinorCompactionNotPartitionedWithoutBuckets() throws Exception {
     String dbName = "default";
     String tableName = "testMmMinorCompaction";
@@ -80,7 +84,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
     // Verify bucket files in delta dirs
     List<String> expectedBucketFiles = Collections.singletonList("000000_0");
     Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
-        CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+        CompactorTestUtil.getBucketFileNamesForMMTables(fs, table, null, actualDeltasAfterComp.get(0)));
     verifyAllContents(tableName, testDataProvider, expectedData);
     // Clean up
     testDataProvider.dropTable(tableName);
@@ -123,7 +127,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
     // Verify bucket files in delta dirs
     List<String> expectedBucketFiles = Arrays.asList("000000_0", "000001_0");
     Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
-        CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+        CompactorTestUtil.getBucketFileNamesForMMTables(fs, table, null, actualDeltasAfterComp.get(0)));
     verifyAllContents(tableName, testDataProvider, expectedData);
     // Clean up
     testDataProvider.dropTable(tableName);
@@ -167,7 +171,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
     List<String> expectedBucketFiles = Collections.singletonList("000000_0");
     Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
         CompactorTestUtil
-            .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0)));
+            .getBucketFileNamesForMMTables(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0)));
     verifyAllContents(tableName, dataProvider, expectedData);
     // Clean up
     dataProvider.dropTable(tableName);
@@ -239,7 +243,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
     List<String> expectedBucketFiles = Arrays.asList("000000_0", "000001_0");
     Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
         CompactorTestUtil
-            .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0)));
+            .getBucketFileNamesForMMTables(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0)));
     verifyAllContents(tableName, dataProvider, expectedData);
     // Clean up
     dataProvider.dropTable(tableName);
@@ -280,7 +284,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
     // Verify bucket file in delta dir
     List<String> expectedBucketFile = Collections.singletonList("000000_0");
     Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFile,
-        CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+        CompactorTestUtil.getBucketFileNamesForMMTables(fs, table, null, actualDeltasAfterComp.get(0)));
     verifyAllContents(tableName, dataProvider, expectedData);
     // Clean up
     dataProvider.dropTable(tableName);
@@ -553,7 +557,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
     // Insert test data into test table
     dataProvider.insertMmTestData(dbName, tableName);
     // Get all data before compaction is run
-    List<String> expectedData = dataProvider.getAllData(dbName, tableName);
+    List<String> expectedData = dataProvider.getAllData(dbName, tableName, false);
     Collections.sort(expectedData);
     // Run a compaction
     CompactorTestUtil.runCompaction(conf, dbName, tableName, compactionType, true);
@@ -565,7 +569,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
     Assert.assertEquals("Result directories does not match after " + compactionType.name()
             + " compaction", Collections.singletonList(resultDirName),
         CompactorTestUtil.getBaseOrDeltaNames(fs, pathFilter, table, null));
-    List<String> actualData = dataProvider.getAllData(dbName, tableName);
+    List<String> actualData = dataProvider.getAllData(dbName, tableName, false);
     Collections.sort(actualData);
     Assert.assertEquals(expectedData, actualData);
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 848c31f..9888315 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -334,18 +335,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
         if (!isMmTable && !isDirectInsert) {
           if (!bDynParts && !isSkewedStoredAsSubDirectories) {
             finalPaths[filesIdx] = new Path(parent, taskWithExt);
-            if (conf.isCompactionTable()) {
-              // Helper tables used for compaction are external and non-acid. We need to keep
-              // track of the taskId to avoid overwrites in the case of multiple
-              // FileSinkOperators, and the file names need to reflect the correct bucketId
-              // because the files will eventually be placed in an acid table, and the
-              // OrcFileMergeOperator should not merge data belonging to different buckets.
-              // Therefore during compaction, data will be stored in the final directory like:
-              // ${hive_staging_dir}/final_dir/taskid/bucketId
-              // For example, ${hive_staging dir}/-ext-10002/000000_0/bucket_00000
-              finalPaths[filesIdx] = new Path(finalPaths[filesIdx],
-                  AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, bucketId));
-            }
           } else {
             finalPaths[filesIdx] =  new Path(buildTmpPath(), taskWithExt);
           }
@@ -505,6 +494,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
   String taskId, originalTaskId;
 
   protected boolean filesCreated = false;
+  protected BitSet filesCreatedPerBucket = new BitSet();
 
   private void initializeSpecPath() {
     // For a query of the type:
@@ -753,6 +743,31 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     }
   }
 
+  /**
+   * There was an issue with the query-based MINOR compaction (HIVE-23763), that the row distribution between the FileSinkOperators
+   * was not correlated correctly with the bucket numbers. So it could happen that rows from different buckets ended up in the same
+   * FileSinkOperator and got written out into one file. This is not correct, one bucket file must contain rows from the same bucket.
+   * Therefore the FileSinkOperator got extended with this method to be able to handle rows from different buckets.
+   * In this case it will create separate files from each bucket. This logic is similar to the one in the createDynamicBucket method.
+   * @param fsp
+   * @throws HiveException
+   */
+  protected void createBucketFilesForCompaction(FSPaths fsp) throws HiveException {
+    try {
+      if (fsp.outPaths.length < bucketId + 1) {
+        fsp.updaters = Arrays.copyOf(fsp.updaters, bucketId + 1);
+        fsp.outPaths = Arrays.copyOf(fsp.outPaths, bucketId + 1);
+        fsp.finalPaths = Arrays.copyOf(fsp.finalPaths, bucketId + 1);
+        fsp.outWriters = Arrays.copyOf(fsp.outWriters, bucketId + 1);
+        statsFromRecordWriter = Arrays.copyOf(statsFromRecordWriter, bucketId + 1);
+      }
+      createBucketForFileIdx(fsp, bucketId);
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+    filesCreatedPerBucket.set(bucketId);
+  }
+
   protected void createBucketFiles(FSPaths fsp) throws HiveException {
     try {
       int filesIdx = 0;
@@ -813,7 +828,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
   protected void createBucketForFileIdx(FSPaths fsp, int filesIdx)
       throws HiveException {
     try {
-      fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), isSkewedStoredAsSubDirectories);
+      if (conf.isCompactionTable()) {
+        fsp.initializeBucketPaths(filesIdx, AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, bucketId),
+            isNativeTable(), isSkewedStoredAsSubDirectories);
+      } else {
+        fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), isSkewedStoredAsSubDirectories);
+      }
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("createBucketForFileIdx " + filesIdx + ": final path " + fsp.finalPaths[filesIdx]
           + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath + ", tmp path "
@@ -962,16 +982,18 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     String lbDirName = null;
     lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row);
 
-    if (!bDynParts && !filesCreated) {
+    if (!bDynParts && (!filesCreated || conf.isCompactionTable())) {
       if (lbDirName != null) {
         if (valToPaths.get(lbDirName) == null) {
           createNewPaths(null, lbDirName);
         }
-      } else {
-        if (conf.isCompactionTable()) {
-          int bucketProperty = getBucketProperty(row);
-          bucketId = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
+      } else if (conf.isCompactionTable()) {
+        int bucketProperty = getBucketProperty(row);
+        bucketId = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
+        if (!filesCreatedPerBucket.get(bucketId)) {
+          createBucketFilesForCompaction(fsp);
         }
+      } else {
         createBucketFiles(fsp);
       }
     }
@@ -1063,7 +1085,11 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
       // RecordUpdater expects to get the actual row, not a serialized version of it.  Thus we
       // pass the row rather than recordValue.
       if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable() || conf.isCompactionTable()) {
-        rowOutWriters[findWriterOffset(row)].write(recordValue);
+        writerOffset = bucketId;
+        if (!conf.isCompactionTable()) {
+          writerOffset = findWriterOffset(row);
+        }
+        rowOutWriters[writerOffset].write(recordValue);
       } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
         fpaths.updaters[findWriterOffset(row)].insert(conf.getTableWriteId(), row);
       } else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 5321188..efc129f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -453,6 +454,19 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
     if (LOG.isTraceEnabled()) {
       LOG.trace("Going to return hash code " + hashCode);
     }
+    if (conf.isCompaction()) {
+      int bucket;
+      Object bucketProperty = ((Object[]) row)[2];
+      if (bucketProperty == null) {
+        return hashCode;
+      }
+      if (bucketProperty instanceof Writable) {
+        bucket = ((IntWritable) bucketProperty).get();
+      } else {
+        bucket = (int) bucketProperty;
+      }
+      return BucketCodec.determineVersion(bucket).decodeWriterId(bucket);
+    }
     return hashCode;
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 0d01d5f..4c12927 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -4238,26 +4238,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
         files = new FileStatus[] {src};
       }
 
-      if (isCompactionTable) {
-        // Helper tables used for query-based compaction have a special file structure after
-        // filesink: tmpdir/attemptid/bucketid.
-        // We don't care about the attemptId anymore and don't want it in the table's final
-        // structure so just move the bucket files.
+      if (isCompactionTable && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE)) {
         try {
-          List<FileStatus> fileStatuses = new ArrayList<>();
-          for (FileStatus file : files) {
-            if (file.isDirectory() && AcidUtils.originalBucketFilter.accept(file.getPath())) {
-              FileStatus[] taskDir = srcFs.listStatus(file.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
-              fileStatuses.addAll(Arrays.asList(taskDir));
-            } else {
-              fileStatuses.add(file);
-            }
-          }
-          files = fileStatuses.toArray(new FileStatus[files.length]);
-
-          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE)) {
-            AcidUtils.OrcAcidVersion.writeVersionFile(destf, destFs);
-          }
+          AcidUtils.OrcAcidVersion.writeVersionFile(destf, destFs);
         } catch (IOException e) {
           if (null != pool) {
             pool.shutdownNow();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java
index 65f86d1..55130b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java
@@ -189,7 +189,7 @@ final class HiveOpConverterUtils {
           reduceKeys.size(), numReducers, acidOperation, NullOrdering.defaultNullOrder(hiveConf));
     } else {
       rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, outputColumnNames, false, tag,
-          partitionCols, order, nullOrder, NullOrdering.defaultNullOrder(hiveConf), numReducers, acidOperation);
+          partitionCols, order, nullOrder, NullOrdering.defaultNullOrder(hiveConf), numReducers, acidOperation, false);
     }
 
     ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index a66d23b..8b1df4b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6887,6 +6887,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     boolean multiFileSpray = false;
     int numFiles = 1;
     int totalFiles = 1;
+    boolean isCompaction = false;
+    if (dest_tab != null && dest_tab.getParameters() != null) {
+      isCompaction = AcidUtils.isCompactionTable(dest_tab.getParameters());
+    }
 
     if (dest_tab.getNumBuckets() > 0 && !dest_tab.getBucketCols().isEmpty()) {
       enforceBucketing = true;
@@ -6956,8 +6960,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         nullOrder.append(sortOrder == DirectionUtils.ASCENDING_CODE ? 'a' : 'z');
       }
       input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), nullOrder.toString(),
-          maxReducers,
-              acidOp);
+          maxReducers, acidOp, isCompaction);
       reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0));
       ctx.setMultiFileSpray(multiFileSpray);
       ctx.setNumFiles(numFiles);
@@ -8833,9 +8836,13 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       acidOp = getAcidType(Utilities.getTableDesc(dest_tab).getOutputFileFormatClass(), dest,
           AcidUtils.isInsertOnlyTable(dest_tab));
     }
+    boolean isCompaction = false;
+    if (dest_tab != null && dest_tab.getParameters() != null) {
+      isCompaction = AcidUtils.isCompactionTable(dest_tab.getParameters());
+    }
     Operator result = genReduceSinkPlan(
         input, partCols, sortCols, order.toString(), nullOrder.toString(),
-        numReducers, acidOp, true);
+        numReducers, acidOp, true, isCompaction);
     if (result.getParentOperators().size() == 1 &&
         result.getParentOperators().get(0) instanceof ReduceSinkOperator) {
       ((ReduceSinkOperator) result.getParentOperators().get(0))
@@ -8846,16 +8853,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
   private Operator genReduceSinkPlan(Operator<?> input,
                                      List<ExprNodeDesc> partitionCols, List<ExprNodeDesc> sortCols,
-                                     String sortOrder, String nullOrder, int numReducers, AcidUtils.Operation acidOp)
+                                     String sortOrder, String nullOrder, int numReducers, AcidUtils.Operation acidOp, boolean isCompaction)
       throws SemanticException {
     return genReduceSinkPlan(input, partitionCols, sortCols, sortOrder, nullOrder, numReducers,
-        acidOp, false);
+        acidOp, false, isCompaction);
   }
 
   @SuppressWarnings("nls")
   private Operator genReduceSinkPlan(Operator<?> input, List<ExprNodeDesc> partitionCols, List<ExprNodeDesc> sortCols,
                                      String sortOrder, String nullOrder, int numReducers, AcidUtils.Operation acidOp,
-                                     boolean pullConstants) throws SemanticException {
+                                     boolean pullConstants, boolean isCompaction) throws SemanticException {
 
     RowResolver inputRR = opParseCtx.get(input).getRowResolver();
 
@@ -8942,7 +8949,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
 
     ReduceSinkDesc rsdesc = PlanUtils.getReduceSinkDesc(newSortCols, valueCols, outputColumns,
         false, -1, partitionCols, newSortOrder.toString(), newNullOrder.toString(), defaultNullOrder,
-        numReducers, acidOp);
+        numReducers, acidOp, isCompaction);
     Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(rsdesc,
         new RowSchema(rsRR.getColumnInfos()), input), rsRR);
 
@@ -14519,7 +14526,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
        */
       buildPTFReduceSinkDetails(tabDef, partCols, orderCols, orderString, nullOrderString);
       input = genReduceSinkPlan(input, partCols, orderCols, orderString.toString(),
-          nullOrderString.toString(), -1, Operation.NOT_ACID);
+          nullOrderString.toString(), -1, Operation.NOT_ACID, false);
     }
 
     /*
@@ -14622,7 +14629,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     }
 
     return genReduceSinkPlan(input, partCols, orderCols, order.toString(), nullOrder.toString(),
-        -1, Operation.NOT_ACID);
+        -1, Operation.NOT_ACID, false);
   }
 
   public static List<WindowExpressionSpec> parseSelect(String selectExprStr)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index 996b2db..d42c3bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -711,7 +711,7 @@ public final class PlanUtils {
       List<ExprNodeDesc> keyCols, List<ExprNodeDesc> valueCols,
       List<String> outputColumnNames, boolean includeKeyCols, int tag,
       List<ExprNodeDesc> partitionCols, String order, String nullOrder, NullOrdering defaultNullOrder,
-      int numReducers, AcidUtils.Operation writeType) {
+      int numReducers, AcidUtils.Operation writeType, boolean isCompaction) {
     ReduceSinkDesc reduceSinkDesc = getReduceSinkDesc(keyCols, keyCols.size(), valueCols,
             new ArrayList<List<Integer>>(),
             includeKeyCols ? outputColumnNames.subList(0, keyCols.size()) :
@@ -723,6 +723,7 @@ public final class PlanUtils {
       reduceSinkDesc.setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.FIXED));
       reduceSinkDesc.setNumReducers(1);
     }
+    reduceSinkDesc.setIsCompaction(isCompaction);
     return reduceSinkDesc;
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
index 34be0b6..9db01eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
@@ -96,6 +96,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
    */
   private int numBuckets;
   private List<ExprNodeDesc> bucketCols;
+  private boolean isCompaction;
 
   private int topN = -1;
   private float topNMemoryUsage = -1;
@@ -447,6 +448,14 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     this.numBuckets = numBuckets;
   }
 
+  public boolean isCompaction() {
+    return isCompaction;
+  }
+
+  public void setIsCompaction(boolean isCompaction) {
+    this.isCompaction = isCompaction;
+  }
+
   @Explain(displayName = "bucketingVersion", explainLevels = { Level.EXTENDED })
   public int getBucketingVersionForExplain() {
     return getBucketingVersion();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
index 1f732f9..8f6a977 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -115,6 +115,18 @@ abstract class QueryCompactor {
       }
       for (String query : compactionQueries) {
         LOG.info("Running {} compaction via query: {}", compactionInfo.isMajorCompaction() ? "major" : "minor", query);
+        if (!compactionInfo.isMajorCompaction()) {
+          // There was an issue with the query-based MINOR compaction (HIVE-23763), that the row distribution between the FileSinkOperators
+          // was not correlated correctly with the bucket numbers. So we could end up with files containing rows from
+          // multiple buckets or rows from the same bucket could end up in different FileSinkOperator. This behaviour resulted
+          // corrupted files. To fix this, the FileSinkOperator has been extended to be able to handle rows from different buckets.
+          // But we also had to be sure that all rows from the same bucket would end up in the same FileSinkOperator. Therefore
+          // the ReduceSinkOperator has also been extended to distribute the rows by bucket numbers. To use this logic,
+          // these two optimisations have to be turned off for the MINOR compaction. The MAJOR compaction works differently
+          // and its query doesn't use reducers, so these optimisations should not be turned off for MAJOR compaction.
+          conf.set("hive.optimize.bucketingsorting", "false");
+          conf.set("hive.vectorized.execution.enabled", "false");
+        }
         DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId);
       }
       commitCompaction(storageDescriptor.getLocation(), tmpTableName, conf, writeIds, compactorTxnId);