You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ku...@apache.org on 2020/08/04 10:36:01 UTC
[hive] branch master updated: HIVE-23763: Query based minor compaction produces wrong files when ro… (#1327)
This is an automated email from the ASF dual-hosted git repository.
kuczoram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 98c925c HIVE-23763: Query based minor compaction produces wrong files when ro… (#1327)
98c925c is described below
commit 98c925c8cb52e410c9aa02a499d2ac15a6d6777b
Author: kuczoram <ku...@cloudera.com>
AuthorDate: Tue Aug 4 12:35:52 2020 +0200
HIVE-23763: Query based minor compaction produces wrong files when ro… (#1327)
* HIVE-23763: Query based minor compaction produces wrong files when rows with different buckets Ids are processed by the same FileSinkOperator
* HIVE-23763: Fix the MM compaction tests
* HIVE-23763: Address the review comments
---
.../hive/ql/txn/compactor/CompactorOnTezTest.java | 150 ++++++++++-
.../hive/ql/txn/compactor/CompactorTestUtil.java | 13 +-
.../ql/txn/compactor/TestCrudCompactorOnTez.java | 290 ++++++++++++++++++++-
.../ql/txn/compactor/TestMmCompactorOnTez.java | 18 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 64 +++--
.../hadoop/hive/ql/exec/ReduceSinkOperator.java | 14 +
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 21 +-
.../opconventer/HiveOpConverterUtils.java | 2 +-
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 25 +-
.../org/apache/hadoop/hive/ql/plan/PlanUtils.java | 3 +-
.../apache/hadoop/hive/ql/plan/ReduceSinkDesc.java | 9 +
.../hive/ql/txn/compactor/QueryCompactor.java | 12 +
12 files changed, 551 insertions(+), 70 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index 71232de..08b9039 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -35,6 +35,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtil.executeStatementOnDriverAndReturnResults;
import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
@@ -53,6 +55,7 @@ public class CompactorOnTezTest {
protected IMetaStoreClient msClient;
protected IDriver driver;
protected boolean runsOnTez = true;
+ protected boolean mmCompaction = false;
@Before
// Note: we create a new conf and driver object before every test
@@ -95,6 +98,17 @@ public class CompactorOnTezTest {
conf.set("hive.tez.container.size", "128");
conf.setBoolean("hive.merge.tezfiles", false);
conf.setBoolean("hive.in.tez.test", true);
+ if (!mmCompaction) {
+ // We need these settings to create a table which is not bucketed, but contains multiple files.
+ // If these parameters are set when inserting 100 rows into the table, the rows will
+ // be distributed into multiple files. This setup is used in the testMinorCompactionWithoutBuckets,
+ // testMinorCompactionWithoutBucketsInsertOverwrite and testMajorCompactionWithoutBucketsInsertAndDeleteInsertOverwrite
+ // tests in the TestCrudCompactorOnTez class.
+ // This use case has to be tested because of HIVE-23763. The MM compaction is not affected by this issue,
+ // therefore no need to set these configs for MM compaction.
+ conf.set("tez.grouping.max-size", "1024");
+ conf.set("tez.grouping.min-size", "1");
+ }
}
@After public void tearDown() {
@@ -218,6 +232,77 @@ public class CompactorOnTezTest {
}
/**
+ * This method is for creating a non-bucketed table in which the data is distributed
+ * into multiple splits. The initial data is 100 rows and it should be split into
+ * multiple files, like bucket_000001, bucket_000002, ...
+ * This is needed because the MINOR compactions had issues with tables like this. (HIVE-23763)
+ * @param dbName
+ * @param tblName
+ * @param tempTblName
+ * @param createDeletes
+ * @param createInserts
+ * @param insertOverwrite
+ * @throws Exception
+ */
+ void createTableWithoutBucketWithMultipleSplits(String dbName, String tblName, String tempTblName,
+ boolean createDeletes, boolean createInserts, boolean insertOverwrite) throws Exception {
+ if (dbName != null) {
+ tblName = dbName + "." + tblName;
+ tempTblName = dbName + "." + tempTblName;
+ }
+
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ StringBuilder query = new StringBuilder();
+ query.append("create table ").append(tblName).append(" (a string, b string, c string)");
+ query.append(" stored as orc");
+ query.append(" TBLPROPERTIES('transactional'='true',");
+ query.append(" 'transactional_properties'='default')");
+ executeStatementOnDriver(query.toString(), driver);
+
+ generateInsertsWithMultipleSplits(0, 100, tblName, tempTblName + "_1", insertOverwrite);
+
+ if (createDeletes) {
+ executeStatementOnDriver("delete from " + tblName + " where a in ('41','87','53','11')", driver);
+ executeStatementOnDriver("delete from " + tblName + " where a in ('42','88','81','12','86')", driver);
+ executeStatementOnDriver("delete from " + tblName + " where a in ('98')", driver);
+ executeStatementOnDriver("delete from " + tblName + " where a in ('40')", driver);
+ }
+
+ if (createInserts) {
+ generateInsertsWithMultipleSplits(100, 250, tblName, tempTblName + "_2", false);
+ generateInsertsWithMultipleSplits(300, 318, tblName, tempTblName + "_3", false);
+ generateInsertsWithMultipleSplits(400, 410, tblName, tempTblName + "_4", false);
+ }
+ }
+
+ private void generateInsertsWithMultipleSplits(int begin, int end, String tableName, String tempTableName,
+ boolean insertOverwrite) throws Exception {
+ StringBuffer sb = new StringBuffer();
+ for (int i = begin; i < end; i++) {
+ sb.append("('");
+ sb.append(i);
+ sb.append("','value");
+ sb.append(i);
+ sb.append("','this is some comment to increase the file size ");
+ sb.append(i);
+ sb.append("')");
+ if (i < end - 1) {
+ sb.append(",");
+ }
+ }
+ executeStatementOnDriver("DROP TABLE IF EXISTS " + tempTableName, driver);
+ executeStatementOnDriver(
+ "CREATE EXTERNAL TABLE " + tempTableName + " (id string, value string, comment string) STORED AS TEXTFILE ",
+ driver);
+ executeStatementOnDriver("insert into " + tempTableName + " values " + sb.toString(), driver);
+ if (insertOverwrite) {
+ executeStatementOnDriver("insert overwrite table " + tableName + " select * from " + tempTableName, driver);
+ } else {
+ executeStatementOnDriver("insert into " + tableName + " select * from " + tempTableName, driver);
+ }
+ }
+
+ /**
* 5 txns.
*/
void insertMmTestData(String tblName) throws Exception {
@@ -261,22 +346,77 @@ public class CompactorOnTezTest {
}
List<String> getAllData(String tblName) throws Exception {
- return getAllData(null, tblName);
+ return getAllData(null, tblName, false);
+ }
+
+ List<String> getAllData(String tblName, boolean withRowId) throws Exception {
+ return getAllData(null, tblName, withRowId);
}
- List<String> getAllData(String dbName, String tblName) throws Exception {
+ List<String> getAllData(String dbName, String tblName, boolean withRowId) throws Exception {
if (dbName != null) {
tblName = dbName + "." + tblName;
}
- List<String> result = executeStatementOnDriverAndReturnResults("select * from " + tblName, driver);
+ StringBuffer query = new StringBuffer();
+ query.append("select ");
+ if (withRowId) {
+ query.append("ROW__ID, ");
+ }
+ query.append("* from ");
+ query.append(tblName);
+ List<String> result = executeStatementOnDriverAndReturnResults(query.toString(), driver);
Collections.sort(result);
return result;
}
+ List<String> getDataWithInputFileNames(String dbName, String tblName) throws Exception {
+ if (dbName != null) {
+ tblName = dbName + "." + tblName;
+ }
+ StringBuffer query = new StringBuffer();
+ query.append("select ");
+ query.append("INPUT__FILE__NAME, a from ");
+ query.append(tblName);
+ query.append(" order by a");
+ List<String> result = executeStatementOnDriverAndReturnResults(query.toString(), driver);
+ return result;
+ }
+
+ boolean compareFileNames(List<String> expectedFileNames, List<String> actualFileNames) {
+ if (expectedFileNames.size() != actualFileNames.size()) {
+ return false;
+ }
+
+ Pattern p = Pattern.compile("(.*)(bucket_[0-9]+)(_[0-9]+)?");
+ for (int i = 0; i < expectedFileNames.size(); i++) {
+ String[] expectedParts = expectedFileNames.get(i).split("\t");
+ String[] actualParts = actualFileNames.get(i).split("\t");
+
+ if (!expectedParts[1].equals(actualParts[1])) {
+ return false;
+ }
+
+ String expectedFileName = null;
+ String actualFileName = null;
+ Matcher m = p.matcher(expectedParts[0]);
+ if (m.matches()) {
+ expectedFileName = m.group(2);
+ }
+ m = p.matcher(actualParts[0]);
+ if (m.matches()) {
+ actualFileName = m.group(2);
+ }
+
+ if (expectedFileName == null || actualFileName == null || !expectedFileName.equals(actualFileName)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
protected List<String> getBucketData(String tblName, String bucketId) throws Exception {
return executeStatementOnDriverAndReturnResults(
- "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order"
- + " by a, b", driver);
+ "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order by ROW__ID, a, b", driver);
}
protected void dropTable(String tblName) throws Exception {
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
index 3ca5d4c..9db2229 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
@@ -95,8 +95,16 @@ class CompactorTestUtil {
throws IOException {
Path path = partitionName == null ? new Path(table.getSd().getLocation(), deltaName) : new Path(
new Path(table.getSd().getLocation()), new Path(partitionName, deltaName));
- return Arrays.stream(fs.listStatus(path, AcidUtils.hiddenFileFilter)).map(FileStatus::getPath).map(Path::getName)
- .sorted().collect(Collectors.toList());
+ return Arrays.stream(fs.listStatus(path, AcidUtils.bucketFileFilter)).map(FileStatus::getPath).map(Path::getName).sorted()
+ .collect(Collectors.toList());
+ }
+
+ static List<String> getBucketFileNamesForMMTables(FileSystem fs, Table table, String partitionName, String deltaName)
+ throws IOException {
+ Path path = partitionName == null ? new Path(table.getSd().getLocation(), deltaName) : new Path(
+ new Path(table.getSd().getLocation()), new Path(partitionName, deltaName));
+ return Arrays.stream(fs.listStatus(path, AcidUtils.hiddenFileFilter)).map(FileStatus::getPath).map(Path::getName).sorted()
+ .collect(Collectors.toList());
}
/**
@@ -160,6 +168,7 @@ class CompactorTestUtil {
throw new IOException("Failed to execute \"" + cmd + "\". Driver returned: " + e);
}
List<String> rs = new ArrayList<>();
+ driver.setMaxRows(400);
driver.getResults(rs);
return rs;
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index 242e1cb..bba5278 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -21,7 +21,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileSystem;
@@ -260,8 +264,8 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t4\t3\ttomorrow",
"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t4\t4\ttoday",
"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2\tyesterday",
- "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday",
"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t4\ttoday",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday",
"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t6\t2\ttoday",
"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t6\t3\ttoday",
"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t4\ttoday"));
@@ -351,8 +355,8 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t3\ttomorrow",
"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4\ttoday",
"{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t2\tyesterday",
- "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday",
"{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t4\ttoday",
+ "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday",
"{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t6\t2\ttoday",
"{\"writeid\":4,\"bucketid\":536936448,\"rowid\":2}\t6\t3\ttoday",
"{\"writeid\":4,\"bucketid\":536936448,\"rowid\":3}\t6\t4\ttoday");
@@ -498,6 +502,261 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
}
@Test
+ public void testMinorCompactionWithoutBuckets() throws Exception {
+ String dbName = "default";
+ String tableName = "testMinorCompaction_wobuckets_1";
+ String tempTableName = "tmp_txt_table_1";
+
+ List<String> expectedDeltas = new ArrayList<>();
+ expectedDeltas.add("delta_0000001_0000001_0000");
+ expectedDeltas.add("delta_0000006_0000006_0000");
+ expectedDeltas.add("delta_0000007_0000007_0000");
+ expectedDeltas.add("delta_0000008_0000008_0000");
+
+ List<String> expectedDeleteDeltas = new ArrayList<>();
+ expectedDeleteDeltas.add("delete_delta_0000002_0000002_0000");
+ expectedDeleteDeltas.add("delete_delta_0000003_0000003_0000");
+ expectedDeleteDeltas.add("delete_delta_0000004_0000004_0000");
+ expectedDeleteDeltas.add("delete_delta_0000005_0000005_0000");
+
+ testMinorCompactionWithoutBucketsCommon(dbName, tableName, tempTableName, false, expectedDeltas,
+ expectedDeleteDeltas, "delta_0000001_0000008_v0000025", CompactionType.MINOR);
+ }
+
+ @Test
+ public void testMinorCompactionWithoutBucketsInsertOverwrite() throws Exception {
+ String dbName = "default";
+ String tableName = "testMinorCompaction_wobuckets_2";
+ String tempTableName = "tmp_txt_table_2";
+
+ List<String> expectedDeltas = new ArrayList<>();
+ expectedDeltas.add("delta_0000006_0000006_0000");
+ expectedDeltas.add("delta_0000007_0000007_0000");
+ expectedDeltas.add("delta_0000008_0000008_0000");
+
+ List<String> expectedDeleteDeltas = new ArrayList<>();
+ expectedDeleteDeltas.add("delete_delta_0000002_0000002_0000");
+ expectedDeleteDeltas.add("delete_delta_0000003_0000003_0000");
+ expectedDeleteDeltas.add("delete_delta_0000004_0000004_0000");
+ expectedDeleteDeltas.add("delete_delta_0000005_0000005_0000");
+
+ testMinorCompactionWithoutBucketsCommon(dbName, tableName, tempTableName, true, expectedDeltas,
+ expectedDeleteDeltas, "delta_0000001_0000008_v0000025", CompactionType.MINOR);
+ }
+
+ @Test
+ public void testMajorCompactionWithoutBucketsInsertAndDeleteInsertOverwrite() throws Exception {
+ String dbName = "default";
+ String tableName = "testMinorCompaction_wobuckets_3";
+ String tempTableName = "tmp_txt_table_3";
+
+ List<String> expectedDeltas = new ArrayList<>();
+ expectedDeltas.add("delta_0000006_0000006_0000");
+ expectedDeltas.add("delta_0000007_0000007_0000");
+ expectedDeltas.add("delta_0000008_0000008_0000");
+
+ List<String> expectedDeleteDeltas = new ArrayList<>();
+ expectedDeleteDeltas.add("delete_delta_0000002_0000002_0000");
+ expectedDeleteDeltas.add("delete_delta_0000003_0000003_0000");
+ expectedDeleteDeltas.add("delete_delta_0000004_0000004_0000");
+ expectedDeleteDeltas.add("delete_delta_0000005_0000005_0000");
+
+ testMinorCompactionWithoutBucketsCommon(dbName, tableName, tempTableName, true, expectedDeltas,
+ expectedDeleteDeltas, "base_0000008_v0000025", CompactionType.MAJOR);
+ }
+
+ private void testMinorCompactionWithoutBucketsCommon(String dbName, String tableName, String tempTableName,
+ boolean insertOverWrite, List<String> expectedDeltas, List<String> expectedDeleteDeltas,
+ String expectedCompactedDeltaDirName, CompactionType compactionType) throws Exception {
+
+ TestDataProvider dataProvider = new TestDataProvider();
+ dataProvider.createTableWithoutBucketWithMultipleSplits(dbName, tableName, tempTableName, true, true,
+ insertOverWrite);
+
+ FileSystem fs = FileSystem.get(conf);
+ Table table = msClient.getTable(dbName, tableName);
+
+ List<String> expectedData = dataProvider.getAllData(tableName);
+ List<String> expectedFileNames = dataProvider.getDataWithInputFileNames(null, tableName);
+
+ // Verify deltas
+ Assert.assertEquals("Delta directories does not match", expectedDeltas,
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+ // Verify delete delta
+ Assert.assertEquals("Delete directories does not match", expectedDeleteDeltas,
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null));
+
+ Set<String> expectedDeleteBucketFilesSet = new HashSet<>();
+ for (String expectedDeleteDelta : expectedDeleteDeltas) {
+ expectedDeleteBucketFilesSet.addAll(CompactorTestUtil.getBucketFileNames(fs, table, null, expectedDeleteDelta));
+ }
+ List<String> expectedDeleteBucketFiles = new ArrayList<>(expectedDeleteBucketFilesSet);
+ Collections.sort(expectedDeleteBucketFiles);
+
+ Set<String> expectedBucketFilesSet = new HashSet<>();
+ for (String expectedDelta : expectedDeltas) {
+ expectedBucketFilesSet.addAll(CompactorTestUtil.getBucketFileNames(fs, table, null, expectedDelta));
+ }
+ List<String> expectedBucketFiles = new ArrayList<>();
+ for (String expectedBucketFile : expectedBucketFilesSet) {
+ Pattern p = Pattern.compile("(bucket_[0-9]+)(_[0-9]+)?");
+ Matcher m = p.matcher(expectedBucketFile);
+ if (m.matches()) {
+ expectedBucketFiles.add(m.group(1));
+ }
+ }
+ Collections.sort(expectedBucketFiles);
+
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, compactionType, true);
+ // Clean up resources
+ CompactorTestUtil.runCleaner(conf);
+
+ // Only 1 compaction should be in the response queue with succeeded state
+ List<ShowCompactResponseElement> compacts =
+ TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+ Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size());
+ Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState());
+
+ // Verify delta and delete delta directories after compaction
+ if (CompactionType.MAJOR == compactionType) {
+ List<String> actualBasesAfterComp =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null);
+ Assert.assertEquals("Base directory does not match after compaction",
+ Collections.singletonList(expectedCompactedDeltaDirName), actualBasesAfterComp);
+ // Verify bucket files in delta and delete delta dirs
+ Assert.assertEquals("Bucket names are not matching after compaction in the base folder",
+ expectedBucketFiles, CompactorTestUtil.getBucketFileNames(fs, table, null, actualBasesAfterComp.get(0)));
+ } else {
+ List<String> actualDeltasAfterComp =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+ Assert.assertEquals("Delta directories does not match after compaction",
+ Collections.singletonList(expectedCompactedDeltaDirName), actualDeltasAfterComp);
+ List<String> actualDeleteDeltasAfterComp =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null);
+ Assert.assertEquals("Delete delta directories does not match after compaction",
+ Collections.singletonList("delete_" + expectedCompactedDeltaDirName), actualDeleteDeltasAfterComp);
+ // Verify bucket files in delta and delete delta dirs
+ Assert.assertEquals("Bucket names are not matching after compaction in the delete deltas",
+ expectedDeleteBucketFiles,
+ CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeleteDeltasAfterComp.get(0)));
+ Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+ CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+ }
+
+ // Verify all contents
+ List<String> actualData = dataProvider.getAllData(tableName);
+ Assert.assertEquals(expectedData, actualData);
+ List<String> actualFileNames = dataProvider.getDataWithInputFileNames(null, tableName);
+ Assert.assertTrue(dataProvider.compareFileNames(expectedFileNames, actualFileNames));
+ dataProvider.dropTable(tableName);
+ }
+
+ @Test
+ public void testMinorAndMajorCompactionWithoutBuckets() throws Exception {
+ String dbName = "default";
+ String tableName = "testMinorCompaction_wobuckets_5";
+ String tempTableName = "tmp_txt_table_5";
+
+ TestDataProvider dataProvider = new TestDataProvider();
+ dataProvider.createTableWithoutBucketWithMultipleSplits(dbName, tableName, tempTableName, true, true, false);
+
+ FileSystem fs = FileSystem.get(conf);
+ Table table = msClient.getTable(dbName, tableName);
+
+ List<String> expectedData = dataProvider.getAllData(tableName);
+ // Verify deltas
+ List<String> expectedDeltas = new ArrayList<>();
+ expectedDeltas.add("delta_0000001_0000001_0000");
+ expectedDeltas.add("delta_0000006_0000006_0000");
+ expectedDeltas.add("delta_0000007_0000007_0000");
+ expectedDeltas.add("delta_0000008_0000008_0000");
+ Assert.assertEquals("Delta directories does not match",
+ expectedDeltas,
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+ // Verify delete delta
+ List<String> expectedDeleteDeltas = new ArrayList<>();
+ expectedDeleteDeltas.add("delete_delta_0000002_0000002_0000");
+ expectedDeleteDeltas.add("delete_delta_0000003_0000003_0000");
+ expectedDeleteDeltas.add("delete_delta_0000004_0000004_0000");
+ expectedDeleteDeltas.add("delete_delta_0000005_0000005_0000");
+ Assert.assertEquals("Delete directories does not match",
+ expectedDeleteDeltas,
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null));
+
+ Set<String> expectedDeleteBucketFilesSet = new HashSet<>();
+ for (String expectedDeleteDelta : expectedDeleteDeltas) {
+ expectedDeleteBucketFilesSet.addAll(CompactorTestUtil.getBucketFileNames(fs, table, null, expectedDeleteDelta));
+ }
+ List<String> expectedDeleteBucketFiles = new ArrayList<>(expectedDeleteBucketFilesSet);
+ Collections.sort(expectedDeleteBucketFiles);
+
+ Set<String> expectedBucketFilesSet = new HashSet<>();
+ for (String expectedDelta : expectedDeltas) {
+ expectedBucketFilesSet.addAll(CompactorTestUtil.getBucketFileNames(fs, table, null, expectedDelta));
+ }
+ List<String> expectedBucketFiles = new ArrayList<>();
+ for (String expectedBucketFile : expectedBucketFilesSet) {
+ Pattern p = Pattern.compile("(bucket_[0-9]+)(_[0-9]+)?");
+ Matcher m = p.matcher(expectedBucketFile);
+ if (m.matches()) {
+ expectedBucketFiles.add(m.group(1));
+ }
+ }
+ Collections.sort(expectedBucketFiles);
+
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+ CompactorTestUtil.runCleaner(conf);
+
+ // Only 1 compaction should be in the response queue with succeeded state
+ List<ShowCompactResponseElement> compacts =
+ TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+ Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size());
+ Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState());
+ // Verify delta directories after compaction
+ List<String> actualDeltasAfterComp =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+ Assert.assertEquals("Delta directories does not match after compaction",
+ Collections.singletonList("delta_0000001_0000008_v0000024"), actualDeltasAfterComp);
+ List<String> actualDeleteDeltasAfterComp =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deleteEventDeltaDirFilter, table, null);
+ Assert.assertEquals("Delete delta directories does not match after compaction",
+ Collections.singletonList("delete_delta_0000001_0000008_v0000024"), actualDeleteDeltasAfterComp);
+ // Verify bucket files in delta dirs
+ List<String> actualData = dataProvider.getAllData(tableName);
+
+ Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+ CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+
+ Assert.assertEquals("Bucket names are not matching after compaction", expectedDeleteBucketFiles,
+ CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeleteDeltasAfterComp.get(0)));
+ // Verify all contents
+ // List<String> actualData = dataProvider.getAllData(tableName);
+ Assert.assertEquals(expectedData, actualData);
+
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true);
+ // Clean up resources
+ CompactorTestUtil.runCleaner(conf);
+
+ // Only 1 compaction should be in the response queue with succeeded state
+ compacts =
+ TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+ Assert.assertEquals("Completed compaction queue must contain one element", 2, compacts.size());
+ Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState());
+ // Verify delta directories after compaction
+ List<String> actualBasesAfterComp =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null);
+ Assert.assertEquals("Base directory does not match after compaction",
+ Collections.singletonList("base_0000008_v0000038"), actualBasesAfterComp);
+ // Verify bucket files in delta dirs
+ Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+ CompactorTestUtil.getBucketFileNames(fs, table, null, actualBasesAfterComp.get(0)));
+ // Verify all contents
+ actualData = dataProvider.getAllData(tableName);
+ Assert.assertEquals(expectedData, actualData);
+ dataProvider.dropTable(tableName);
+ }
+
+ @Test
public void testMinorCompactionNotPartitionedWithBuckets() throws Exception {
Assume.assumeTrue(runsOnTez);
String dbName = "default";
@@ -630,6 +889,24 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
CompactorTestUtil
.getBucketFileNames(fs, table, partitionToday, actualDeleteDeltasAfterCompPartToday.get(0)));
+ // Verify contents of bucket files.
+ // Bucket 0
+ List<String> expectedRsBucket0 = Arrays
+ .asList("{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t3\tyesterday",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t4\ttoday",
+ "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t3\ttoday",
+ "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t3\t4\tyesterday",
+ "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t4\t3\ttomorrow",
+ "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":2}\t4\t4\ttoday",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t2\tyesterday",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t4\ttoday",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t5\t3\tyesterday",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t6\t2\ttoday",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":2}\t6\t3\ttoday",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":3}\t6\t4\ttoday");
+ List<String> rsBucket0 = dataProvider.getBucketData(tableName, "536870912");
+ Assert.assertEquals(expectedRsBucket0, rsBucket0);
+
// Verify all contents
List<String> actualData = dataProvider.getAllData(tableName);
Assert.assertEquals(expectedData, actualData);
@@ -704,14 +981,13 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
List<String> rsBucket0 = dataProvider.getBucketData(tableName, "536870912");
Assert.assertEquals(expectedRsBucket0, rsBucket0);
// Bucket 1
- List<String> expectedRsBucket1 = Arrays.asList(
- "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t3\tyesterday",
+ List<String> expectedRsBucket1 = Arrays.asList("{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t3\tyesterday",
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4\ttoday",
"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t3\ttomorrow",
"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4\ttoday",
"{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t2\tyesterday",
- "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday",
"{\"writeid\":4,\"bucketid\":536936448,\"rowid\":0}\t5\t4\ttoday",
+ "{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t5\t3\tyesterday",
"{\"writeid\":4,\"bucketid\":536936448,\"rowid\":1}\t6\t2\ttoday",
"{\"writeid\":4,\"bucketid\":536936448,\"rowid\":2}\t6\t3\ttoday",
"{\"writeid\":4,\"bucketid\":536936448,\"rowid\":3}\t6\t4\ttoday");
@@ -1250,7 +1526,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
// Insert test data into test table
dataProvider.insertTestData(dbName, tableName);
// Get all data before compaction is run
- List<String> expectedData = dataProvider.getAllData(dbName, tableName);
+ List<String> expectedData = dataProvider.getAllData(dbName, tableName, false);
Collections.sort(expectedData);
// Run a compaction
CompactorTestUtil.runCompaction(conf, dbName, tableName, compactionType, true);
@@ -1263,7 +1539,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
+ " compaction", Collections.singletonList(resultDirName),
CompactorTestUtil.getBaseOrDeltaNames(fs, pathFilter, table, null));
// Verify all contents
- List<String> actualData = dataProvider.getAllData(dbName, tableName);
+ List<String> actualData = dataProvider.getAllData(dbName, tableName, false);
Assert.assertEquals(expectedData, actualData);
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
index 9772bbe..dd9e06c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
@@ -49,6 +49,10 @@ import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeState
*/
public class TestMmCompactorOnTez extends CompactorOnTezTest {
+ public TestMmCompactorOnTez() {
+ mmCompaction = true;
+ }
+
@Test public void testMmMinorCompactionNotPartitionedWithoutBuckets() throws Exception {
String dbName = "default";
String tableName = "testMmMinorCompaction";
@@ -80,7 +84,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
// Verify bucket files in delta dirs
List<String> expectedBucketFiles = Collections.singletonList("000000_0");
Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
- CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+ CompactorTestUtil.getBucketFileNamesForMMTables(fs, table, null, actualDeltasAfterComp.get(0)));
verifyAllContents(tableName, testDataProvider, expectedData);
// Clean up
testDataProvider.dropTable(tableName);
@@ -123,7 +127,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
// Verify bucket files in delta dirs
List<String> expectedBucketFiles = Arrays.asList("000000_0", "000001_0");
Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
- CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+ CompactorTestUtil.getBucketFileNamesForMMTables(fs, table, null, actualDeltasAfterComp.get(0)));
verifyAllContents(tableName, testDataProvider, expectedData);
// Clean up
testDataProvider.dropTable(tableName);
@@ -167,7 +171,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
List<String> expectedBucketFiles = Collections.singletonList("000000_0");
Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
CompactorTestUtil
- .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0)));
+ .getBucketFileNamesForMMTables(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0)));
verifyAllContents(tableName, dataProvider, expectedData);
// Clean up
dataProvider.dropTable(tableName);
@@ -239,7 +243,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
List<String> expectedBucketFiles = Arrays.asList("000000_0", "000001_0");
Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
CompactorTestUtil
- .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0)));
+ .getBucketFileNamesForMMTables(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0)));
verifyAllContents(tableName, dataProvider, expectedData);
// Clean up
dataProvider.dropTable(tableName);
@@ -280,7 +284,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
// Verify bucket file in delta dir
List<String> expectedBucketFile = Collections.singletonList("000000_0");
Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFile,
- CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+ CompactorTestUtil.getBucketFileNamesForMMTables(fs, table, null, actualDeltasAfterComp.get(0)));
verifyAllContents(tableName, dataProvider, expectedData);
// Clean up
dataProvider.dropTable(tableName);
@@ -553,7 +557,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
// Insert test data into test table
dataProvider.insertMmTestData(dbName, tableName);
// Get all data before compaction is run
- List<String> expectedData = dataProvider.getAllData(dbName, tableName);
+ List<String> expectedData = dataProvider.getAllData(dbName, tableName, false);
Collections.sort(expectedData);
// Run a compaction
CompactorTestUtil.runCompaction(conf, dbName, tableName, compactionType, true);
@@ -565,7 +569,7 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
Assert.assertEquals("Result directories does not match after " + compactionType.name()
+ " compaction", Collections.singletonList(resultDirName),
CompactorTestUtil.getBaseOrDeltaNames(fs, pathFilter, table, null));
- List<String> actualData = dataProvider.getAllData(dbName, tableName);
+ List<String> actualData = dataProvider.getAllData(dbName, tableName, false);
Collections.sort(actualData);
Assert.assertEquals(expectedData, actualData);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 848c31f..9888315 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -334,18 +335,6 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
if (!isMmTable && !isDirectInsert) {
if (!bDynParts && !isSkewedStoredAsSubDirectories) {
finalPaths[filesIdx] = new Path(parent, taskWithExt);
- if (conf.isCompactionTable()) {
- // Helper tables used for compaction are external and non-acid. We need to keep
- // track of the taskId to avoid overwrites in the case of multiple
- // FileSinkOperators, and the file names need to reflect the correct bucketId
- // because the files will eventually be placed in an acid table, and the
- // OrcFileMergeOperator should not merge data belonging to different buckets.
- // Therefore during compaction, data will be stored in the final directory like:
- // ${hive_staging_dir}/final_dir/taskid/bucketId
- // For example, ${hive_staging dir}/-ext-10002/000000_0/bucket_00000
- finalPaths[filesIdx] = new Path(finalPaths[filesIdx],
- AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, bucketId));
- }
} else {
finalPaths[filesIdx] = new Path(buildTmpPath(), taskWithExt);
}
@@ -505,6 +494,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
String taskId, originalTaskId;
protected boolean filesCreated = false;
+ protected BitSet filesCreatedPerBucket = new BitSet();
private void initializeSpecPath() {
// For a query of the type:
@@ -753,6 +743,31 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
}
+ /**
+ * There was an issue with the query-based MINOR compaction (HIVE-23763), that the row distribution between the FileSinkOperators
+ * was not correlated correctly with the bucket numbers. So it could happen that rows from different buckets ended up in the same
+ * FileSinkOperator and got written out into one file. This is not correct, one bucket file must contain rows from the same bucket.
+ * Therefore the FileSinkOperator got extended with this method to be able to handle rows from different buckets.
+ * In this case it will create separate files from each bucket. This logic is similar to the one in the createDynamicBucket method.
+ * @param fsp
+ * @throws HiveException
+ */
+ protected void createBucketFilesForCompaction(FSPaths fsp) throws HiveException {
+ try {
+ if (fsp.outPaths.length < bucketId + 1) {
+ fsp.updaters = Arrays.copyOf(fsp.updaters, bucketId + 1);
+ fsp.outPaths = Arrays.copyOf(fsp.outPaths, bucketId + 1);
+ fsp.finalPaths = Arrays.copyOf(fsp.finalPaths, bucketId + 1);
+ fsp.outWriters = Arrays.copyOf(fsp.outWriters, bucketId + 1);
+ statsFromRecordWriter = Arrays.copyOf(statsFromRecordWriter, bucketId + 1);
+ }
+ createBucketForFileIdx(fsp, bucketId);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ filesCreatedPerBucket.set(bucketId);
+ }
+
protected void createBucketFiles(FSPaths fsp) throws HiveException {
try {
int filesIdx = 0;
@@ -813,7 +828,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
protected void createBucketForFileIdx(FSPaths fsp, int filesIdx)
throws HiveException {
try {
- fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), isSkewedStoredAsSubDirectories);
+ if (conf.isCompactionTable()) {
+ fsp.initializeBucketPaths(filesIdx, AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, bucketId),
+ isNativeTable(), isSkewedStoredAsSubDirectories);
+ } else {
+ fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), isSkewedStoredAsSubDirectories);
+ }
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("createBucketForFileIdx " + filesIdx + ": final path " + fsp.finalPaths[filesIdx]
+ "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath + ", tmp path "
@@ -962,16 +982,18 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
String lbDirName = null;
lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row);
- if (!bDynParts && !filesCreated) {
+ if (!bDynParts && (!filesCreated || conf.isCompactionTable())) {
if (lbDirName != null) {
if (valToPaths.get(lbDirName) == null) {
createNewPaths(null, lbDirName);
}
- } else {
- if (conf.isCompactionTable()) {
- int bucketProperty = getBucketProperty(row);
- bucketId = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
+ } else if (conf.isCompactionTable()) {
+ int bucketProperty = getBucketProperty(row);
+ bucketId = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
+ if (!filesCreatedPerBucket.get(bucketId)) {
+ createBucketFilesForCompaction(fsp);
}
+ } else {
createBucketFiles(fsp);
}
}
@@ -1063,7 +1085,11 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
// RecordUpdater expects to get the actual row, not a serialized version of it. Thus we
// pass the row rather than recordValue.
if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable() || conf.isCompactionTable()) {
- rowOutWriters[findWriterOffset(row)].write(recordValue);
+ writerOffset = bucketId;
+ if (!conf.isCompactionTable()) {
+ writerOffset = findWriterOffset(row);
+ }
+ rowOutWriters[writerOffset].write(recordValue);
} else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
fpaths.updaters[findWriterOffset(row)].insert(conf.getTableWriteId(), row);
} else {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
index 5321188..efc129f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CompilationOpContext;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.BucketCodec;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -453,6 +454,19 @@ public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
if (LOG.isTraceEnabled()) {
LOG.trace("Going to return hash code " + hashCode);
}
+ if (conf.isCompaction()) {
+ int bucket;
+ Object bucketProperty = ((Object[]) row)[2];
+ if (bucketProperty == null) {
+ return hashCode;
+ }
+ if (bucketProperty instanceof Writable) {
+ bucket = ((IntWritable) bucketProperty).get();
+ } else {
+ bucket = (int) bucketProperty;
+ }
+ return BucketCodec.determineVersion(bucket).decodeWriterId(bucket);
+ }
return hashCode;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 0d01d5f..4c12927 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -4238,26 +4238,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
files = new FileStatus[] {src};
}
- if (isCompactionTable) {
- // Helper tables used for query-based compaction have a special file structure after
- // filesink: tmpdir/attemptid/bucketid.
- // We don't care about the attemptId anymore and don't want it in the table's final
- // structure so just move the bucket files.
+ if (isCompactionTable && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE)) {
try {
- List<FileStatus> fileStatuses = new ArrayList<>();
- for (FileStatus file : files) {
- if (file.isDirectory() && AcidUtils.originalBucketFilter.accept(file.getPath())) {
- FileStatus[] taskDir = srcFs.listStatus(file.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
- fileStatuses.addAll(Arrays.asList(taskDir));
- } else {
- fileStatuses.add(file);
- }
- }
- files = fileStatuses.toArray(new FileStatus[files.length]);
-
- if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE)) {
- AcidUtils.OrcAcidVersion.writeVersionFile(destf, destFs);
- }
+ AcidUtils.OrcAcidVersion.writeVersionFile(destf, destFs);
} catch (IOException e) {
if (null != pool) {
pool.shutdownNow();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java
index 65f86d1..55130b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/calcite/translator/opconventer/HiveOpConverterUtils.java
@@ -189,7 +189,7 @@ final class HiveOpConverterUtils {
reduceKeys.size(), numReducers, acidOperation, NullOrdering.defaultNullOrder(hiveConf));
} else {
rsDesc = PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, outputColumnNames, false, tag,
- partitionCols, order, nullOrder, NullOrdering.defaultNullOrder(hiveConf), numReducers, acidOperation);
+ partitionCols, order, nullOrder, NullOrdering.defaultNullOrder(hiveConf), numReducers, acidOperation, false);
}
ReduceSinkOperator rsOp = (ReduceSinkOperator) OperatorFactory.getAndMakeChild(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index a66d23b..8b1df4b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6887,6 +6887,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
boolean multiFileSpray = false;
int numFiles = 1;
int totalFiles = 1;
+ boolean isCompaction = false;
+ if (dest_tab != null && dest_tab.getParameters() != null) {
+ isCompaction = AcidUtils.isCompactionTable(dest_tab.getParameters());
+ }
if (dest_tab.getNumBuckets() > 0 && !dest_tab.getBucketCols().isEmpty()) {
enforceBucketing = true;
@@ -6956,8 +6960,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
nullOrder.append(sortOrder == DirectionUtils.ASCENDING_CODE ? 'a' : 'z');
}
input = genReduceSinkPlan(input, partnCols, sortCols, order.toString(), nullOrder.toString(),
- maxReducers,
- acidOp);
+ maxReducers, acidOp, isCompaction);
reduceSinkOperatorsAddedByEnforceBucketingSorting.add((ReduceSinkOperator)input.getParentOperators().get(0));
ctx.setMultiFileSpray(multiFileSpray);
ctx.setNumFiles(numFiles);
@@ -8833,9 +8836,13 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
acidOp = getAcidType(Utilities.getTableDesc(dest_tab).getOutputFileFormatClass(), dest,
AcidUtils.isInsertOnlyTable(dest_tab));
}
+ boolean isCompaction = false;
+ if (dest_tab != null && dest_tab.getParameters() != null) {
+ isCompaction = AcidUtils.isCompactionTable(dest_tab.getParameters());
+ }
Operator result = genReduceSinkPlan(
input, partCols, sortCols, order.toString(), nullOrder.toString(),
- numReducers, acidOp, true);
+ numReducers, acidOp, true, isCompaction);
if (result.getParentOperators().size() == 1 &&
result.getParentOperators().get(0) instanceof ReduceSinkOperator) {
((ReduceSinkOperator) result.getParentOperators().get(0))
@@ -8846,16 +8853,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
private Operator genReduceSinkPlan(Operator<?> input,
List<ExprNodeDesc> partitionCols, List<ExprNodeDesc> sortCols,
- String sortOrder, String nullOrder, int numReducers, AcidUtils.Operation acidOp)
+ String sortOrder, String nullOrder, int numReducers, AcidUtils.Operation acidOp, boolean isCompaction)
throws SemanticException {
return genReduceSinkPlan(input, partitionCols, sortCols, sortOrder, nullOrder, numReducers,
- acidOp, false);
+ acidOp, false, isCompaction);
}
@SuppressWarnings("nls")
private Operator genReduceSinkPlan(Operator<?> input, List<ExprNodeDesc> partitionCols, List<ExprNodeDesc> sortCols,
String sortOrder, String nullOrder, int numReducers, AcidUtils.Operation acidOp,
- boolean pullConstants) throws SemanticException {
+ boolean pullConstants, boolean isCompaction) throws SemanticException {
RowResolver inputRR = opParseCtx.get(input).getRowResolver();
@@ -8942,7 +8949,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
ReduceSinkDesc rsdesc = PlanUtils.getReduceSinkDesc(newSortCols, valueCols, outputColumns,
false, -1, partitionCols, newSortOrder.toString(), newNullOrder.toString(), defaultNullOrder,
- numReducers, acidOp);
+ numReducers, acidOp, isCompaction);
Operator interim = putOpInsertMap(OperatorFactory.getAndMakeChild(rsdesc,
new RowSchema(rsRR.getColumnInfos()), input), rsRR);
@@ -14519,7 +14526,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
*/
buildPTFReduceSinkDetails(tabDef, partCols, orderCols, orderString, nullOrderString);
input = genReduceSinkPlan(input, partCols, orderCols, orderString.toString(),
- nullOrderString.toString(), -1, Operation.NOT_ACID);
+ nullOrderString.toString(), -1, Operation.NOT_ACID, false);
}
/*
@@ -14622,7 +14629,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
}
return genReduceSinkPlan(input, partCols, orderCols, order.toString(), nullOrder.toString(),
- -1, Operation.NOT_ACID);
+ -1, Operation.NOT_ACID, false);
}
public static List<WindowExpressionSpec> parseSelect(String selectExprStr)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
index 996b2db..d42c3bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
@@ -711,7 +711,7 @@ public final class PlanUtils {
List<ExprNodeDesc> keyCols, List<ExprNodeDesc> valueCols,
List<String> outputColumnNames, boolean includeKeyCols, int tag,
List<ExprNodeDesc> partitionCols, String order, String nullOrder, NullOrdering defaultNullOrder,
- int numReducers, AcidUtils.Operation writeType) {
+ int numReducers, AcidUtils.Operation writeType, boolean isCompaction) {
ReduceSinkDesc reduceSinkDesc = getReduceSinkDesc(keyCols, keyCols.size(), valueCols,
new ArrayList<List<Integer>>(),
includeKeyCols ? outputColumnNames.subList(0, keyCols.size()) :
@@ -723,6 +723,7 @@ public final class PlanUtils {
reduceSinkDesc.setReducerTraits(EnumSet.of(ReduceSinkDesc.ReducerTraits.FIXED));
reduceSinkDesc.setNumReducers(1);
}
+ reduceSinkDesc.setIsCompaction(isCompaction);
return reduceSinkDesc;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
index 34be0b6..9db01eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
@@ -96,6 +96,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
*/
private int numBuckets;
private List<ExprNodeDesc> bucketCols;
+ private boolean isCompaction;
private int topN = -1;
private float topNMemoryUsage = -1;
@@ -447,6 +448,14 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
this.numBuckets = numBuckets;
}
+ public boolean isCompaction() {
+ return isCompaction;
+ }
+
+ public void setIsCompaction(boolean isCompaction) {
+ this.isCompaction = isCompaction;
+ }
+
@Explain(displayName = "bucketingVersion", explainLevels = { Level.EXTENDED })
public int getBucketingVersionForExplain() {
return getBucketingVersion();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
index 1f732f9..8f6a977 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -115,6 +115,18 @@ abstract class QueryCompactor {
}
for (String query : compactionQueries) {
LOG.info("Running {} compaction via query: {}", compactionInfo.isMajorCompaction() ? "major" : "minor", query);
+ if (!compactionInfo.isMajorCompaction()) {
+ // There was an issue with the query-based MINOR compaction (HIVE-23763), that the row distribution between the FileSinkOperators
+ // was not correlated correctly with the bucket numbers. So we could end up with files containing rows from
+ // multiple buckets or rows from the same bucket could end up in different FileSinkOperator. This behaviour resulted
+ // corrupted files. To fix this, the FileSinkOperator has been extended to be able to handle rows from different buckets.
+ // But we also had to be sure that all rows from the same bucket would end up in the same FileSinkOperator. Therefore
+ // the ReduceSinkOperator has also been extended to distribute the rows by bucket numbers. To use this logic,
+ // these two optimisations have to be turned off for the MINOR compaction. The MAJOR compaction works differently
+ // and its query doesn't use reducers, so these optimisations should not be turned off for MAJOR compaction.
+ conf.set("hive.optimize.bucketingsorting", "false");
+ conf.set("hive.vectorized.execution.enabled", "false");
+ }
DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId);
}
commitCompaction(storageDescriptor.getLocation(), tmpTableName, conf, writeIds, compactorTxnId);