You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/07/21 09:58:06 UTC

[hive] branch master updated: HIVE-23853: CRUD based compaction also should update ACID file version metadata (Peter Vary reviewed by Karen Coppage and Marta Kuczora)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 9563dd6  HIVE-23853: CRUD based compaction also should update ACID file version metadata (Peter Vary reviewed by Karen Coppage and Marta Kuczora)
9563dd6 is described below

commit 9563dd63188280f4b7c307f36e1ffffea0c69aec
Author: pvary <pv...@cloudera.com>
AuthorDate: Tue Jul 21 11:57:52 2020 +0200

    HIVE-23853: CRUD based compaction also should update ACID file version metadata (Peter Vary reviewed by Karen Coppage and Marta Kuczora)
    
    Closes (#1259)
---
 .../hive/ql/txn/compactor/CompactorTestUtil.java   |   5 +-
 .../ql/txn/compactor/TestCrudCompactorOnTez.java   |  56 ++++++++-
 .../org/apache/hadoop/hive/ql/io/orc/OrcFile.java  |   7 ++
 .../hadoop/hive/ql/io/orc/OrcOutputFormat.java     |  14 ++-
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   |   4 +
 .../org/apache/hadoop/hive/ql/TestTxnCommands.java | 139 +++++++--------------
 .../ql/txn/compactor/CompactorTestUtilities.java   | 128 +++++++++++++++++++
 7 files changed, 252 insertions(+), 101 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
index 577b839..3ca5d4c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtil.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
@@ -94,8 +95,8 @@ class CompactorTestUtil {
       throws IOException {
     Path path = partitionName == null ? new Path(table.getSd().getLocation(), deltaName) : new Path(
         new Path(table.getSd().getLocation()), new Path(partitionName, deltaName));
-    return Arrays.stream(fs.listStatus(path)).map(FileStatus::getPath).map(Path::getName).sorted()
-        .collect(Collectors.toList());
+    return Arrays.stream(fs.listStatus(path, AcidUtils.hiddenFileFilter)).map(FileStatus::getPath).map(Path::getName)
+        .sorted().collect(Collectors.toList());
   }
 
   /**
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index 2cd98ae..242e1cb 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -67,6 +67,9 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
 
   @Test
   public void testMajorCompactionNotPartitionedWithoutBuckets() throws Exception {
+    boolean originalEnableVersionFile = conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, true);
+
     String dbName = "default";
     String tblName = "testMajorCompaction";
     TestDataProvider testDataProvider = new TestDataProvider();
@@ -122,6 +125,10 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
         testDataProvider.getBucketData(tblName, "536870912"));
     // Check bucket file contents
     checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), expectedBase), 0);
+
+    CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs, true,
+        new String[] { AcidUtils.BASE_PREFIX});
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, originalEnableVersionFile);
   }
 
   /**
@@ -130,6 +137,9 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
    */
   @Test
   public void testMajorCompactionNotPartitioned4Buckets() throws Exception {
+    boolean originalEnableVersionFile = conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, false);
+
     String dbName = "default";
     String tblName = "testMajorCompaction";
     executeStatementOnDriver("drop table if exists " + tblName, driver);
@@ -208,6 +218,10 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), expectedBase), 0);
     checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), expectedBase), 1);
     checkBucketIdAndRowIdInAcidFile(fs, new Path(table.getSd().getLocation(), expectedBase), 2);
+
+    CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs, false,
+        new String[] { AcidUtils.BASE_PREFIX});
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, originalEnableVersionFile);
   }
 
   @Test
@@ -294,6 +308,9 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     checkBucketIdAndRowIdInAcidFile(fs, new Path(todayPath, "base_0000005_v0000009"), 0);
     checkBucketIdAndRowIdInAcidFile(fs, new Path(tomorrowPath, "base_0000005_v0000013"), 0);
     checkBucketIdAndRowIdInAcidFile(fs, new Path(yesterdayPath, "base_0000005_v0000017"), 0);
+
+    CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+        conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE), new String[] { AcidUtils.BASE_PREFIX});
   }
 
   @Test public void testMajorCompactionPartitionedWithBuckets() throws Exception {
@@ -401,6 +418,9 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, partitionTomorrow), expectedBaseTomorrow), 1);
     checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, partitionYesterday), expectedBaseYesterday), 0);
     checkBucketIdAndRowIdInAcidFile(fs, new Path(new Path(tablePath, partitionYesterday), expectedBaseYesterday), 1);
+
+    CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+        conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE), new String[] { AcidUtils.BASE_PREFIX});
   }
 
   @Test
@@ -468,6 +488,11 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     // Verify all contents
     List<String> actualData = dataProvider.getAllData(tableName);
     Assert.assertEquals(expectedData, actualData);
+
+    CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+        conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+        new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX});
+
     // Clean up
     dataProvider.dropTable(tableName);
   }
@@ -541,6 +566,11 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     // Verify all contents
     List<String> actualData = dataProvider.getAllData(tableName);
     Assert.assertEquals(expectedData, actualData);
+
+    CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+        conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+        new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX});
+
     // Clean up
     dataProvider.dropTable(tableName);
   }
@@ -603,6 +633,11 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     // Verify all contents
     List<String> actualData = dataProvider.getAllData(tableName);
     Assert.assertEquals(expectedData, actualData);
+
+    CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+        conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+        new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX});
+
     // Clean up
     dataProvider.dropTable(tableName);
   }
@@ -685,6 +720,11 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     // Verify all contents
     List<String> actualData = dataProvider.getAllData(tableName);
     Assert.assertEquals(expectedData, actualData);
+
+    CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+        conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+        new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX});
+
     // Clean up
     dataProvider.dropTable(tableName);
   }
@@ -738,6 +778,11 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     // Verify all contents
     List<String> actualData = dataProvider.getAllData(tableName);
     Assert.assertEquals(expectedData, actualData);
+
+    CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+        conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+        new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX});
+
     // Clean up
     dataProvider.dropTable(tableName);
   }
@@ -788,6 +833,10 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     Assert.assertEquals("Delete delta directories does not match after compaction",
         Collections.singletonList("delete_delta_0000001_0000015_v0000044"), actualDeleteDeltasAfterComp);
 
+    CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+        conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+        new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX});
+
   }
 
   @Test
@@ -821,7 +870,6 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
       CompactorTestUtil.checkExpectedTxnsPresent(null,
           new Path[] {new Path(table.getSd().getLocation(), "delta_0000001_0000005_v0000009")}, "a,b", "int:string",
           0, 1L, 4L, null, 1);
-
     } finally {
       if (connection != null) {
         connection.close();
@@ -853,6 +901,10 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
     CompactorTestUtil.checkExpectedTxnsPresent(null,
         new Path[] {new Path(table.getSd().getLocation(), "delta_0000001_0000006_v0000009")}, "a,b", "int:string", 0,
         1L, 4L, Lists.newArrayList(5, 6), 1);
+
+    CompactorTestUtilities.checkAcidVersion(fs.listFiles(new Path(table.getSd().getLocation()), true), fs,
+        conf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE),
+        new String[] { AcidUtils.DELTA_PREFIX });
   }
 
   @Test
@@ -868,7 +920,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
         .newArrayList(new CompactorTestUtil.StreamingConnectionOption(false, false),
             new CompactorTestUtil.StreamingConnectionOption(true, false),
             new CompactorTestUtil.StreamingConnectionOption(false, false)));
-    // Now, copact
+    // Now, compact
     CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
     // Find the location of the table
     IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 349eb25..9e99887 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.LlapDaemonInfo;
 import org.apache.hadoop.hive.llap.LlapUtil;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.orc.FileMetadata;
@@ -153,6 +154,7 @@ public final class OrcFile extends org.apache.orc.OrcFile {
     // the smallest stripe size would be 5120 rows, which changes the output
     // of some of the tests.)
     private int batchSize = 1000;
+    private boolean isCompaction;
 
     WriterOptions(Properties tableProperties, Configuration conf) {
       super(tableProperties, conf);
@@ -161,6 +163,7 @@ public final class OrcFile extends org.apache.orc.OrcFile {
         LlapProxy.isDaemon()) {
         memory(getThreadLocalOrcLlapMemoryManager(conf));
       }
+      isCompaction = AcidUtils.isCompactionTable(tableProperties);
     }
 
    /**
@@ -349,6 +352,10 @@ public final class OrcFile extends org.apache.orc.OrcFile {
     int getBatchSize() {
       return batchSize;
     }
+
+    boolean isCompaction() {
+      return isCompaction;
+    }
   }
 
   /**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
index 202f78b..c56fb6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
@@ -82,8 +82,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
     public void write(NullWritable nullWritable,
                       OrcSerdeRow row) throws IOException {
       if (writer == null) {
-        options.inspector(row.getInspector());
-        writer = OrcFile.createWriter(path, options);
+        init(row);
       }
       writer.addRow(row.getRow());
     }
@@ -92,8 +91,7 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
     public void write(Writable row) throws IOException {
       OrcSerdeRow serdeRow = (OrcSerdeRow) row;
       if (writer == null) {
-        options.inspector(serdeRow.getInspector());
-        writer = OrcFile.createWriter(path, options);
+        init(serdeRow);
       }
       writer.addRow(serdeRow.getRow());
     }
@@ -121,6 +119,14 @@ public class OrcOutputFormat extends FileOutputFormat<NullWritable, OrcSerdeRow>
       stats.setRowCount(null == writer ? 0 : writer.getNumberOfRows());
       return stats;
     }
+
+    private void init(OrcSerdeRow serdeRow) throws IOException {
+      options.inspector(serdeRow.getInspector());
+      writer = OrcFile.createWriter(path, options);
+      if (options.isCompaction()) {
+        AcidUtils.OrcAcidVersion.setAcidVersionInDataFile(writer);
+      }
+    }
   }
 
   private OrcFile.WriterOptions getOptions(JobConf conf, Properties props) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index e17086f..b60e099 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -4206,6 +4206,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
             }
           }
           files = fileStatuses.toArray(new FileStatus[files.length]);
+
+          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE)) {
+            AcidUtils.OrcAcidVersion.writeVersionFile(destf, destFs);
+          }
         } catch (IOException e) {
           if (null != pool) {
             pool.shutdownNow();
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 4043212..c10f822 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -18,16 +18,13 @@
 package org.apache.hadoop.hive.ql;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Optional;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CountDownLatch;
@@ -37,16 +34,18 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.conf.Constants;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
 import org.apache.hadoop.hive.metastore.api.LockState;
 import org.apache.hadoop.hive.metastore.api.LockType;
@@ -67,13 +66,12 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.BucketCodec;
-import org.apache.hadoop.hive.ql.io.orc.OrcFile;
-import org.apache.hadoop.hive.ql.io.orc.Reader;
 import org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtilities;
 import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Ignore;
@@ -1248,7 +1246,19 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     Assert.assertEquals(stringifyValues(expected), r);
   }
   @Test
-  public void testVersioning() throws Exception {
+  public void testVersioningVersionFileEnabled() throws Exception {
+    acidVersionTest(true);
+  }
+
+  @Test
+  public void testVersioningVersionFileDisabled() throws Exception {
+    acidVersionTest(false);
+  }
+
+  private void acidVersionTest(boolean enableVersionFile) throws Exception {
+    boolean originalEnableVersionFile = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE);
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, enableVersionFile);
+
     hiveConf.set(MetastoreConf.ConfVars.CREATE_TABLES_AS_ACID.getVarname(), "true");
     // Need to close the thread local Hive object so that configuration change is reflected to HMS.
     Hive.closeCurrent();
@@ -1257,105 +1267,48 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
     int[][] data = {{1, 2}};
     //create 1 delta file bucket_00000
     runStatementOnDriver("insert into T" + makeValuesClause(data));
+    runStatementOnDriver("update T set a=3 where b=2");
 
-    //delete the bucket files so now we have empty delta dirs
-    List<String> rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T");
     FileSystem fs = FileSystem.get(hiveConf);
-    Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.DELTA_PREFIX));
-    Path  filePath = new Path(rs.get(0));
-    int version = getAcidVersionFromDataFile(filePath, fs);
-    //check it has expected version marker
-    Assert.assertEquals("Unexpected version marker in " + filePath,
-        AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version);
-
-    //check that delta dir has a version file with expected value
-    filePath = filePath.getParent();
-    Assert.assertTrue(filePath.getName().startsWith(AcidUtils.DELTA_PREFIX));
-    int versionFromMetaFile = getAcidVersionFromMetaFile(filePath, fs);
-    Assert.assertEquals("Unexpected version marker in " + filePath,
-        AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile);
+    RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path(getWarehouseDir(), "t"), true);
+    CompactorTestUtilities.checkAcidVersion(files, fs, enableVersionFile,
+        new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX });
 
-    runStatementOnDriver("insert into T" + makeValuesClause(data));
-    runStatementOnDriver("alter table T compact 'major'");
-    runWorker(hiveConf);
+    runStatementOnDriver("alter table T compact 'minor'");
+    TestTxnCommands2.runWorker(hiveConf);
 
-    //check status of compaction job
+    // Check status of compaction job
     TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
     ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
     Assert.assertEquals("Unexpected 0 compaction state",
         TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
     Assert.assertTrue(resp.getCompacts().get(0).getHadoopJobId().startsWith("job_local"));
+    Assert.assertTrue(resp.getCompacts().get(0).getType().equals(CompactionType.MINOR));
 
-    rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T");
-    Assert.assertTrue(rs != null && rs.size() == 1 && rs.get(0).contains(AcidUtils.BASE_PREFIX));
-
-    filePath = new Path(rs.get(0));
-    version = getAcidVersionFromDataFile(filePath, fs);
-    //check that files produced by compaction still have the version marker
-    Assert.assertEquals("Unexpected version marker in " + filePath,
-        AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version);
+    // Check the files after minor compaction
+    files = fs.listFiles(new Path(getWarehouseDir(), "t"), true);
+    CompactorTestUtilities.checkAcidVersion(files, fs, enableVersionFile,
+        new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX });
 
-    //check that compacted base dir has a version file with expected value
-    filePath = filePath.getParent();
-    Assert.assertTrue(filePath.getName().startsWith(AcidUtils.BASE_PREFIX));
-    versionFromMetaFile = getAcidVersionFromMetaFile(filePath, fs);
-    Assert.assertEquals("Unexpected version marker in " + filePath,
-        AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile);
-
-    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, false);
     runStatementOnDriver("insert into T" + makeValuesClause(data));
-    //delete the bucket files so now we have empty delta dirs
-    rs = runStatementOnDriver("select distinct INPUT__FILE__NAME from T");
-    Optional<String> deltaDir = rs.stream().filter(p -> p.contains(AcidUtils.DELTA_PREFIX)).findAny();
-    Assert.assertTrue("Delta dir should be present", deltaDir.isPresent());
-    Assert.assertFalse("Version marker should not exists",
-        fs.exists(AcidUtils.OrcAcidVersion.getVersionFilePath(new Path(deltaDir.get()).getParent())));
-    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, true);
-  }
-
-  private static final Charset UTF8 = Charset.forName("UTF-8");
-  private static final int ORC_ACID_VERSION_DEFAULT = 0;
-  /**
-   * This is smart enough to handle streaming ingest where there could be a
-   * {@link AcidUtils#DELTA_SIDE_FILE_SUFFIX} side file.
-   * @param dataFile - ORC acid data file
-   * @return version property from file if there,
-   *          {@link #ORC_ACID_VERSION_DEFAULT} otherwise
-   */
-  private static int getAcidVersionFromDataFile(Path dataFile, FileSystem fs) throws IOException {
-    FileStatus fileStatus = fs.getFileStatus(dataFile);
-    Reader orcReader = OrcFile.createReader(dataFile,
-        OrcFile.readerOptions(fs.getConf())
-            .filesystem(fs)
-            //make sure to check for side file in case streaming ingest died
-            .maxLength(AcidUtils.getLogicalLength(fs, fileStatus)));
-    if (orcReader.hasMetadataValue(AcidUtils.OrcAcidVersion.ACID_VERSION_KEY)) {
-      char[] versionChar =
-          UTF8.decode(orcReader.getMetadataValue(AcidUtils.OrcAcidVersion.ACID_VERSION_KEY)).array();
-      String version = new String(versionChar);
-      return Integer.valueOf(version);
-    }
-    return ORC_ACID_VERSION_DEFAULT;
-  }
 
-  private static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs)
-      throws IOException {
-    Path formatFile = AcidUtils.OrcAcidVersion.getVersionFilePath(deltaOrBaseDir);
-    try (FSDataInputStream inputStream = fs.open(formatFile)) {
-      byte[] bytes = new byte[1];
-      int read = inputStream.read(bytes);
-      if (read != -1) {
-        String version = new String(bytes, UTF8);
-        return Integer.valueOf(version);
-      }
-      return ORC_ACID_VERSION_DEFAULT;
-    } catch (FileNotFoundException fnf) {
-      LOG.debug(formatFile + " not found, returning default: " + ORC_ACID_VERSION_DEFAULT);
-      return ORC_ACID_VERSION_DEFAULT;
-    } catch(IOException ex) {
-      LOG.error(formatFile + " is unreadable due to: " + ex.getMessage(), ex);
-      throw ex;
-    }
+    runStatementOnDriver("alter table T compact 'major'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    // Check status of compaction job
+    txnHandler = TxnUtils.getTxnStore(hiveConf);
+    resp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize());
+    Assert.assertEquals("Unexpected 1 compaction state",
+        TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(1).getState());
+    Assert.assertTrue(resp.getCompacts().get(1).getHadoopJobId().startsWith("job_local"));
+
+    // Check the files after major compaction
+    files = fs.listFiles(new Path(getWarehouseDir(), "t"), true);
+    CompactorTestUtilities.checkAcidVersion(files, fs, enableVersionFile,
+        new String[] { AcidUtils.DELTA_PREFIX, AcidUtils.DELETE_DELTA_PREFIX, AcidUtils.BASE_PREFIX });
+
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_WRITE_ACID_VERSION_FILE, originalEnableVersionFile);
   }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtilities.java
new file mode 100644
index 0000000..1a080b2
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTestUtilities.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.Reader;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+public class CompactorTestUtilities {
+  private static final Logger LOG = LoggerFactory.getLogger(CompactorTestUtilities.class);
+
+  private static final Charset UTF8 = Charset.forName("UTF-8");
+  private static final int ORC_ACID_VERSION_DEFAULT = 0;
+
+  /**
+   * This is smart enough to handle streaming ingest where there could be a
+   * {@link AcidUtils#DELTA_SIDE_FILE_SUFFIX} side file.
+   * @param dataFile - ORC acid data file
+   * @return version property from file if there,
+   *          {@link #ORC_ACID_VERSION_DEFAULT} otherwise
+   */
+  private static int getAcidVersionFromDataFile(Path dataFile, FileSystem fs) throws IOException {
+    FileStatus fileStatus = fs.getFileStatus(dataFile);
+    Reader orcReader = OrcFile.createReader(dataFile,
+        OrcFile.readerOptions(fs.getConf())
+            .filesystem(fs)
+            //make sure to check for side file in case streaming ingest died
+            .maxLength(AcidUtils.getLogicalLength(fs, fileStatus)));
+    if (orcReader.hasMetadataValue(AcidUtils.OrcAcidVersion.ACID_VERSION_KEY)) {
+      char[] versionChar =
+          UTF8.decode(orcReader.getMetadataValue(AcidUtils.OrcAcidVersion.ACID_VERSION_KEY)).array();
+      String version = new String(versionChar);
+      return Integer.valueOf(version);
+    }
+    return ORC_ACID_VERSION_DEFAULT;
+  }
+
+  private static int getAcidVersionFromMetaFile(Path deltaOrBaseDir, FileSystem fs)
+      throws IOException {
+    Path formatFile = AcidUtils.OrcAcidVersion.getVersionFilePath(deltaOrBaseDir);
+    try (FSDataInputStream inputStream = fs.open(formatFile)) {
+      byte[] bytes = new byte[1];
+      int read = inputStream.read(bytes);
+      if (read != -1) {
+        String version = new String(bytes, UTF8);
+        return Integer.valueOf(version);
+      }
+      return ORC_ACID_VERSION_DEFAULT;
+    } catch (FileNotFoundException fnf) {
+      LOG.debug(formatFile + " not found, returning default: " + ORC_ACID_VERSION_DEFAULT);
+      return ORC_ACID_VERSION_DEFAULT;
+    } catch(IOException ex) {
+      LOG.error(formatFile + " is unreadable due to: " + ex.getMessage(), ex);
+      throw ex;
+    }
+  }
+
+  public static void checkAcidVersion(RemoteIterator<LocatedFileStatus> files, FileSystem fs, boolean hasVersionFile,
+      String[] expectedTypes) throws IOException
+  {
+    Set<String> foundPrefixes = new HashSet<>(expectedTypes.length);
+    Set<String> foundDirectories = new HashSet<>(expectedTypes.length);
+    while(files.hasNext()) {
+      LocatedFileStatus file = files.next();
+      Path path = file.getPath();
+      if (!path.getName().equals(AcidUtils.OrcAcidVersion.ACID_FORMAT)) {
+        int version = getAcidVersionFromDataFile(path, fs);
+        //check that files produced by compaction still have the version marker
+        Assert.assertEquals("Unexpected version marker in " + path.getName(),
+            AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, version);
+      }
+      Path parent = path.getParent();
+      if (!foundDirectories.contains(parent)) {
+        if (parent.getName().startsWith(AcidUtils.BASE_PREFIX)) {
+          foundPrefixes.add(AcidUtils.BASE_PREFIX);
+        } else if (parent.getName().startsWith(AcidUtils.DELTA_PREFIX)) {
+          foundPrefixes.add(AcidUtils.DELTA_PREFIX);
+        } else if (parent.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
+          foundPrefixes.add(AcidUtils.DELETE_DELTA_PREFIX);
+        }
+        // It is a directory
+        if (hasVersionFile) {
+          Assert.assertTrue("Version marker should exists",
+              fs.exists(AcidUtils.OrcAcidVersion.getVersionFilePath(parent)));
+          int versionFromMetaFile = getAcidVersionFromMetaFile(parent, fs);
+          Assert.assertEquals("Unexpected version marker in " + parent,
+              AcidUtils.OrcAcidVersion.ORC_ACID_VERSION, versionFromMetaFile);
+        } else {
+          Assert.assertFalse("Version marker should not exists",
+              fs.exists(AcidUtils.OrcAcidVersion.getVersionFilePath(parent)));
+        }
+      }
+    }
+    Assert.assertEquals("Did not found all types of directories", new HashSet<>(Arrays.asList(expectedTypes)),
+        foundPrefixes);
+  }
+}