You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/07/21 09:58:06 UTC
[hive] branch master updated: HIVE-23853: CRUD based compaction
also should update ACID file version metadata (Peter Vary reviewed by Karen
Coppage and Marta Kuczora)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9563dd6 HIVE-23853: CRUD based compaction also should update ACID file version metadata (Peter Vary reviewed by Karen Coppage and Marta Kuczora)
9563dd6 is described below
commit 9563dd63188280f4b7c307f36e1ffffea0c69aec
Author: pvary <pv...@cloudera.com>
AuthorDate: Tue Jul 21 11:57:52 2020 +0200
HIVE-23853: CRUD based compaction also should update ACID file version metadata (Peter Vary reviewed by Karen Coppage and Marta Kuczora)
Closes (#1259)
---
.../hive/ql/txn/compactor/CompactorTestUtil.java | 5 +-
.../ql/txn/compactor/TestCrudCompactorOnTez.java | 56 ++++++++-
.../org/apache/hadoop/hive/ql/io/orc/OrcFile.java | 7 ++
.../hadoop/hive/ql/io/orc/OrcOutputFormat.java | 14 ++-
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 4 +
.../org/apache/hadoop/hive/ql/TestTxnCommands.java | 139 +++++++--------------
.../ql/txn/compactor/CompactorTestUtilities.java | 128 +++++++++++++++++++
7 files changed, 252 insertions(+), 101 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
index 577b839..3ca5d4c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
@@ -94,8 +95,8 @@ class CompactorTestUtil {
throws IOException {
Path path = partitionName == null ? new Path(table.getSd().getLocation(), deltaName) : new Path(
new Path(table.getSd().getLocation()), new Path(partitionName, deltaName));
- return Arrays.stream(fs.listStatus(path)).map(FileStatus::getPath).map(Path::getName).sorted()
- .collect(Collectors.toList());
+ return Arrays.stream(fs.listStatus(path, AcidUtils.hiddenFileFilter)).map(FileStatus::getPath).map(Path::getName)
+ .sorted().collect(Collectors.toList());
}
/**
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index 2cd98ae..242e1cb 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -67,6 +67,9 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
@Test
public void testMajorCompactionNotPartitionedWithoutBuckets() throws Exception {
+ boolean originalEnableVersionFile = conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, true);
+
String dbName = "default";
String tblName = "testMajorCompaction";
TestDataProvider testDataProvider = new TestDataProvider();
@@ -122,6 +125,10 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
testDataProvider.getBucketData(tblName, "536870912"));
// Check bucket file contents
checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), expectedBase), 0);
+
+ CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs, true,
+ new String[] { AcidUtils.BASE_PREFIX});
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, originalEnableVersionFile);
}
/**
@@ -130,6 +137,9 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
*/
@Test
public void testMajorCompactionNotPartitioned4Buckets() throws Exception {
+ boolean originalEnableVersionFile = conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, false);
+
String dbName = "default";
String tblName = "testMajorCompaction";
executeStatementOnDriver("drop table if exists " + tblName, driver);
@@ -208,6 +218,10 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), expectedBase), 0);
checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), expectedBase), 1);
checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), expectedBase), 2);
+
+ CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs, false,
+ new String[] { AcidUtils.BASE_PREFIX});
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, originalEnableVersionFile);
}
@Test
@@ -294,6 +308,9 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
checkBucketIdAndRowIdInAcidFile(fs, new Path(todayPath, "base_0000005_v0000009"), 0);
checkBucketIdAndRowIdInAcidFile(fs, new Path(tomorrowPath, "base_0000005_v0000013"), 0);
checkBucketIdAndRowIdInAcidFile(fs, new Path(yesterdayPath, "base_0000005_v0000017"), 0);
+
+ CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE), new String[] { AcidUtils.BASE_PREFIX});
}
@Test public void testMajorCompactionPartitionedWithBuckets() throws Exception {
@@ -401,6 +418,9 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, partitionTomorrow), expectedBaseTomorrow), 1);
checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, partitionYesterday), expectedBaseYesterday), 0);
checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, partitionYesterday), expectedBaseYesterday), 1);
+
+ CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE), new String[] { AcidUtils.BASE_PREFIX});
}
@Test
@@ -468,6 +488,11 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
// Verify all contents
List<String> actualData = dataProvider.getAllData(tableName);
Assert.assertEquals(expectedData, actualData);
+
+ CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+ new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX});
+
// Clean up
dataProvider.dropTable(tableName);
}
@@ -541,6 +566,11 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
// Verify all contents
List<String> actualData = dataProvider.getAllData(tableName);
Assert.assertEquals(expectedData, actualData);
+
+ CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+ new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX});
+
// Clean up
dataProvider.dropTable(tableName);
}
@@ -603,6 +633,11 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
// Verify all contents
List<String> actualData = dataProvider.getAllData(tableName);
Assert.assertEquals(expectedData, actualData);
+
+ CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+ new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX});
+
// Clean up
dataProvider.dropTable(tableName);
}
@@ -685,6 +720,11 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
// Verify all contents
List<String> actualData = dataProvider.getAllData(tableName);
Assert.assertEquals(expectedData, actualData);
+
+ CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+ new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX});
+
// Clean up
dataProvider.dropTable(tableName);
}
@@ -738,6 +778,11 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
// Verify all contents
List<String> actualData = dataProvider.getAllData(tableName);
Assert.assertEquals(expectedData, actualData);
+
+ CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+ new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX});
+
// Clean up
dataProvider.dropTable(tableName);
}
@@ -788,6 +833,10 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
Assert.assertEquals("Delete delta directories does not match after compaction",
Collections.singletonList("delete_delta_0000001_0000015_v0000044"), actualDeleteDeltasAfterComp);
+ CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+ new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX});
+
}
@Test
@@ -821,7 +870,6 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
CompactorTestUtil.checkExpectedTxnsPresent(null,
new Path[] {new Path(table.getSd().getLocation(), "delta_0000001_0000005_v0000009")}, "a,b", "int:string",
0, 1L, 4L, null, 1);
-
} finally {
if (connection != null) {
connection.close();
@@ -853,6 +901,10 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
CompactorTestUtil.checkExpectedTxnsPresent(null,
new Path[] {new Path(table.getSd().getLocation(), "delta_0000001_0000006_v0000009")}, "a,b", "int:string", 0,
1L, 4L, Lists.newArrayList(5, 6), 1);
+
+ CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+ conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+ new String[] { AcidUtils.DELTA_PREFIX });
}
@Test
@@ -868,7 +920,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
.newArrayList(new CompactorTestUtil.StreamingConnectionOption(false, false),
new CompactorTestUtil.StreamingConnectionOption(true, false),
new CompactorTestUtil.StreamingConnectionOption(false, false)));
- // Now, copact
+ // Now, compact
CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
// Find the location of the table
IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 349eb25..9e99887 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.LlapDaemonInfo;
import org.apache.hadoop.hive.llap.LlapUtil;
import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.orc.FileMetadata;
@@ -153,6 +154,7 @@ public final class OrcFile extends org.apache.orc.OrcFile {
// the smallest stripe size would be 5120 rows, which changes the output
// of some of the tests.)
private int batchSize = 1000;
+ private boolean isCompaction;
WriterOptions(Properties tableProperties, Configuration conf) {
super(tableProperties, conf);
@@ -161,6 +163,7 @@ public final class OrcFile extends org.apache.orc.OrcFile {
LlapProxy.isDaemon()) {
memory(getThreadLocalOrcLlapMemoryManager(conf));
}
+ isCompaction = AcidUtils.isCompactionTable(tableProperties);
}
/**
@@ -349,6 +352,10 @@ public final class OrcFile extends org.apache.orc.OrcFile {
int getBatchSize() {
return batchSize;
}
+
+ boolean isCompaction() {
+ return isCompaction;
+ }
}
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index 202f78b..c56fb6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -82,8 +82,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
public void write(NullWritable nullWritable,
OrcSerdeRow row) throws IOException {
if (writer == null) {
- options.inspector(row.getInspector());
- writer = OrcFile.createWriter(path, options);
+ init(row);
}
writer.addRow(row.getRow());
}
@@ -92,8 +91,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
public void write(Writable row) throws IOException {
OrcSerdeRow serdeRow = (OrcSerdeRow) row;
if (writer == null) {
- options.inspector(serdeRow.getInspector());
- writer = OrcFile.createWriter(path, options);
+ init(serdeRow);
}
writer.addRow(serdeRow.getRow());
}
@@ -121,6 +119,14 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
stats.setRowCount(null == writer ? 0 : writer.getNumberOfRows());
return stats;
}
+
+ private void init(OrcSerdeRow serdeRow) throws IOException {
+ options.inspector(serdeRow.getInspector());
+ writer = OrcFile.createWriter(path, options);
+ if (options.isCompaction()) {
+ AcidUtils.OrcAcidVersion.setAcidVersionInDataFile(writer);
+ }
+ }
}
private OrcFile.WriterOptions getOptions(JobConf conf, Properties props) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index e17086f..b60e099 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -4206,6 +4206,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
}
}
files = fileStatuses.toArray(new FileStatus[files.length]);
+
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE)) {
+ AcidUtils.OrcAcidVersion.writeVersionFile(destf, destFs);
+ }
} catch (IOException e) {
if (null != pool) {
pool.shutdownNow();
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 4043212..c10f822 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -18,16 +18,13 @@
package org.apache.hadoop.hive.ql;
import java.io.File;
-import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
-import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CountDownLatch;
@@ -37,16 +34,18 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockType;
@@ -67,13 +66,12 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.BucketCodec;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile;
-import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtilities;
import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Ignore;
@@ -1248,7 +1246,19 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
Assert.assertEquals(stringifyValues(expected), r);
}
@Test
- public void testVersioning() throws Exception {
+ public void testVersioningVersionFileEnabled() throws Exception {
+ acidVersionTest(true);
+ }
+
+ @Test
+ public void testVersioningVersionFileDisabled() throws Exception {
+ acidVersionTest(false);
+ }
+
+ private void acidVersionTest(boolean enableVersionFile) throws Exception {
+ boolean originalEnableVersionFile = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, enableVersionFile);
+
hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true");
// Need to close the thread local Hive object so that configuration change is reflected to HMS.
Hive.closeCurrent();
@@ -1257,105 +1267,48 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
int[][] data = {{1, 2}};
//create 1 delta file bucket_00000
runStatementOnDriver("insert into T" + makeValuesClause(data));
+ runStatementOnDriver("update T set a=3 where b=2");
- //delete the bucket files so now we have empty delta dirs
- List<String> rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T");
FileSystem fs = FileSystem.get(hiveConf);
- Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.DELTA_PREFIX));
- Path filePath = new Path(rs.get(0));
- int version = getAcidVersionFromDataFile(filePath, fs);
- //check it has expected version marker
- Assert.assertEquals("Unexpected version marker in " + filePath,
- AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version);
-
- //check that delta dir has a version file with expected value
- filePath = filePath.getParent();
- Assert.assertTrue(filePath.getName().startsWith(AcidUtils.DELTA_PREFIX));
- int versionFromMetaFile = getAcidVersionFromMetaFile(filePath, fs);
- Assert.assertEquals("Unexpected version marker in " + filePath,
- AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile);
+ RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path(getWarehouseDir(), "t"), true);
+ CompactorTestUtilities.checkAcidVersion(files, fs, enableVersionFile,
+ new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX });
- runStatementOnDriver("insert into T" + makeValuesClause(data));
- runStatementOnDriver("alter table T compact 'major'");
- runWorker(hiveConf);
+ runStatementOnDriver("alter table T compact 'minor'");
+ TestTxnCommands2.runWorker(hiveConf);
- //check status of compaction job
+ // Check status of compaction job
TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
Assert.assertEquals("Unexpected 0 compaction state",
TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
+ Assert.assertTrue(resp.getCompacts().get(0).getType().equals(CompactionType.MINOR));
- rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T");
- Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.BASE_PREFIX));
-
- filePath = new Path(rs.get(0));
- version = getAcidVersionFromDataFile(filePath, fs);
- //check that files produced by compaction still have the version marker
- Assert.assertEquals("Unexpected version marker in " + filePath,
- AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version);
+ // Check the files after minor compaction
+ files = fs.listFiles(new Path(getWarehouseDir(), "t"), true);
+ CompactorTestUtilities.checkAcidVersion(files, fs, enableVersionFile,
+ new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX });
- //check that compacted base dir has a version file with expected value
- filePath = filePath.getParent();
- Assert.assertTrue(filePath.getName().startsWith(AcidUtils.BASE_PREFIX));
- versionFromMetaFile = getAcidVersionFromMetaFile(filePath, fs);
- Assert.assertEquals("Unexpected version marker in " + filePath,
- AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile);
-
- hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, false);
runStatementOnDriver("insert into T" + makeValuesClause(data));
- //delete the bucket files so now we have empty delta dirs
- rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T");
- Optional<String> deltaDir = rs.stream().filter(p -> p.contains(AcidUtils.DELTA_PREFIX)).findAny();
- Assert.assertTrue("Delta dir should be present", deltaDir.isPresent());
- Assert.assertFalse("Version marker should not exists",
- fs.exists(AcidUtils.OrcAcidVersion.getVersionFilePath(new Path(deltaDir.get()).getParent())));
- hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, true);
- }
-
- private static final Charset UTF8 = Charset.forName("UTF-8");
- private static final int ORC_ACID_VERSION_DEFAULT = 0;
- /**
- * This is smart enough to handle streaming ingest where there could be a
- * {@link AcidUtils#DELTA_SIDE_FILE_SUFFIX} side file.
- * @param dataFile - ORC acid data file
- * @return version property from file if there,
- * {@link #ORC_ACID_VERSION_DEFAULT} otherwise
- */
- private static int getAcidVersionFromDataFile(Path dataFile, FileSystem fs) throws IOException {
- FileStatus fileStatus = fs.getFileStatus(dataFile);
- Reader orcReader = OrcFile.createReader(dataFile,
- OrcFile.readerOptions(fs.getConf())
- .filesystem(fs)
- //make sure to check for side file in case streaming ingest died
- .maxLength(AcidUtils.getLogicalLength(fs, fileStatus)));
- if (orcReader.hasMetadataValue(AcidUtils.OrcAcidVersion.ACID_VERSION_KEY)) {
- char[] versionChar =
- UTF8.decode(orcReader.getMetadataValue(AcidUtils.OrcAcidVersion.ACID_VERSION_KEY)).array();
- String version = new String(versionChar);
- return Integer.valueOf(version);
- }
- return ORC_ACID_VERSION_DEFAULT;
- }
- private static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs)
- throws IOException {
- Path formatFile = AcidUtils.OrcAcidVersion.getVersionFilePath(deltaOrBaseDir);
- try (FSDataInputStream inputStream = fs.open(formatFile)) {
- byte[] bytes = new byte[1];
- int read = inputStream.read(bytes);
- if (read != -1) {
- String version = new String(bytes, UTF8);
- return Integer.valueOf(version);
- }
- return ORC_ACID_VERSION_DEFAULT;
- } catch (FileNotFoundException fnf) {
- LOG.debug(formatFile + " not found, returning default: " + ORC_ACID_VERSION_DEFAULT);
- return ORC_ACID_VERSION_DEFAULT;
- } catch(IOException ex) {
- LOG.error(formatFile + " is unreadable due to: " + ex.getMessage(), ex);
- throw ex;
- }
+ runStatementOnDriver("alter table T compact 'major'");
+ TestTxnCommands2.runWorker(hiveConf);
+
+ // Check status of compaction job
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
+ resp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize());
+ Assert.assertEquals("Unexpected 1 compaction state",
+ TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(1).getState());
+ Assert.assertTrue(resp.getCompacts().get(1).getHadoopJobId().startsWith("job_local"));
+
+ // Check the files after major compaction
+ files = fs.listFiles(new Path(getWarehouseDir(), "t"), true);
+ CompactorTestUtilities.checkAcidVersion(files, fs, enableVersionFile,
+ new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.BASE_PREFIX });
+
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, originalEnableVersionFile);
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtilities.java
new file mode 100644
index 0000000..1a080b2
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtilities.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+public class CompactorTestUtilities {
+ private static final Logger LOG = LoggerFactory.getLogger(CompactorTestUtilities.class);
+
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+ private static final int ORC_ACID_VERSION_DEFAULT = 0;
+
+ /**
+ * This is smart enough to handle streaming ingest where there could be a
+ * {@link AcidUtils#DELTA_SIDE_FILE_SUFFIX} side file.
+ * @param dataFile - ORC acid data file
+ * @return version property from file if there,
+ * {@link #ORC_ACID_VERSION_DEFAULT} otherwise
+ */
+ private static int getAcidVersionFromDataFile(Path dataFile, FileSystem fs) throws IOException {
+ FileStatus fileStatus = fs.getFileStatus(dataFile);
+ Reader orcReader = OrcFile.createReader(dataFile,
+ OrcFile.readerOptions(fs.getConf())
+ .filesystem(fs)
+ //make sure to check for side file in case streaming ingest died
+ .maxLength(AcidUtils.getLogicalLength(fs, fileStatus)));
+ if (orcReader.hasMetadataValue(AcidUtils.OrcAcidVersion.ACID_VERSION_KEY)) {
+ char[] versionChar =
+ UTF8.decode(orcReader.getMetadataValue(AcidUtils.OrcAcidVersion.ACID_VERSION_KEY)).array();
+ String version = new String(versionChar);
+ return Integer.valueOf(version);
+ }
+ return ORC_ACID_VERSION_DEFAULT;
+ }
+
+ private static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs)
+ throws IOException {
+ Path formatFile = AcidUtils.OrcAcidVersion.getVersionFilePath(deltaOrBaseDir);
+ try (FSDataInputStream inputStream = fs.open(formatFile)) {
+ byte[] bytes = new byte[1];
+ int read = inputStream.read(bytes);
+ if (read != -1) {
+ String version = new String(bytes, UTF8);
+ return Integer.valueOf(version);
+ }
+ return ORC_ACID_VERSION_DEFAULT;
+ } catch (FileNotFoundException fnf) {
+ LOG.debug(formatFile + " not found, returning default: " + ORC_ACID_VERSION_DEFAULT);
+ return ORC_ACID_VERSION_DEFAULT;
+ } catch(IOException ex) {
+ LOG.error(formatFile + " is unreadable due to: " + ex.getMessage(), ex);
+ throw ex;
+ }
+ }
+
+ public static void checkAcidVersion(RemoteIterator<LocatedFileStatus> files, FileSystem fs, boolean hasVersionFile,
+ String[] expectedTypes) throws IOException
+ {
+ Set<String> foundPrefixes = new HashSet<>(expectedTypes.length);
+ Set<String> foundDirectories = new HashSet<>(expectedTypes.length);
+ while(files.hasNext()) {
+ LocatedFileStatus file = files.next();
+ Path path = file.getPath();
+ if (!path.getName().equals(AcidUtils.OrcAcidVersion.ACID_FORMAT)) {
+ int version = getAcidVersionFromDataFile(path, fs);
+ //check that files produced by compaction still have the version marker
+ Assert.assertEquals("Unexpected version marker in " + path.getName(),
+ AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version);
+ }
+ Path parent = path.getParent();
+ if (!foundDirectories.contains(parent)) {
+ if (parent.getName().startsWith(AcidUtils.BASE_PREFIX)) {
+ foundPrefixes.add(AcidUtils.BASE_PREFIX);
+ } else if (parent.getName().startsWith(AcidUtils.DELTA_PREFIX)) {
+ foundPrefixes.add(AcidUtils.DELTA_PREFIX);
+ } else if (parent.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
+ foundPrefixes.add(AcidUtils.DELETE_DELTA_PREFIX);
+ }
+ // It is a directory
+ if (hasVersionFile) {
+ Assert.assertTrue("Version marker should exists",
+ fs.exists(AcidUtils.OrcAcidVersion.getVersionFilePath(parent)));
+ int versionFromMetaFile = getAcidVersionFromMetaFile(parent, fs);
+ Assert.assertEquals("Unexpected version marker in " + parent,
+ AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile);
+ } else {
+ Assert.assertFalse("Version marker should not exists",
+ fs.exists(AcidUtils.OrcAcidVersion.getVersionFilePath(parent)));
+ }
+ }
+ }
+ Assert.assertEquals("Did not found all types of directories", new HashSet<>(Arrays.asList(expectedTypes)),
+ foundPrefixes);
+ }
+}