You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2020/02/17 13:54:27 UTC
[hive] branch master updated: HIVE-22610: Minor compaction for MM
(insert-only) tables (Karen Coppage, reviewed by Laszlo Pinter)
This is an automated email from the ASF dual-hosted git repository.
lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 9251663 HIVE-22610: Minor compaction for MM (insert-only) tables (Karen Coppage, reviewed by Laszlo Pinter)
9251663 is described below
commit 92516631ab39f39df5d0692f98ac32c2cd320997
Author: Karen Coppage <ka...@cloudera.com>
AuthorDate: Mon Feb 17 14:29:02 2020 +0100
HIVE-22610: Minor compaction for MM (insert-only) tables (Karen Coppage, reviewed by Laszlo Pinter)
---
.../hive/ql/txn/compactor/CompactorOnTezTest.java | 240 +++++++++
.../ql/txn/compactor/TestCrudCompactorOnTez.java | 216 +++-----
.../ql/txn/compactor/TestMmCompactorOnTez.java | 564 +++++++++++++++++++++
.../hadoop/hive/ql/txn/compactor/CompactorMR.java | 15 -
.../ql/txn/compactor/MmMajorQueryCompactor.java | 151 +-----
.../ql/txn/compactor/MmMinorQueryCompactor.java | 211 ++++++++
.../ql/txn/compactor/MmQueryCompactorUtils.java | 200 ++++++++
.../ql/txn/compactor/QueryCompactorFactory.java | 6 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 37 --
.../hadoop/hive/ql/TestTxnCommandsForMmTable.java | 126 -----
10 files changed, 1295 insertions(+), 471 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
new file mode 100644
index 0000000..78174f3
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtil.executeStatementOnDriverAndReturnResults;
+import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
+
+/**
+ * Superclass for Test[Crud|Mm]CompactorOnTez, for setup and helper classes.
+ */
+public class CompactorOnTezTest {
+ private static final AtomicInteger RANDOM_INT = new AtomicInteger(new Random().nextInt());
+ private static final String TEST_DATA_DIR = new File(
+ System.getProperty("java.io.tmpdir") + File.separator + TestCrudCompactorOnTez.class
+ .getCanonicalName() + "-" + System.currentTimeMillis() + "_" + RANDOM_INT
+ .getAndIncrement()).getPath().replaceAll("\\\\", "/");
+ private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
+ protected HiveConf conf;
+ protected IMetaStoreClient msClient;
+ protected IDriver driver;
+
+ @Before
+ // Note: we create a new conf and driver object before every test
+ public void setup() throws Exception {
+ File f = new File(TEST_WAREHOUSE_DIR);
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
+ throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
+ }
+ HiveConf hiveConf = new HiveConf(this.getClass());
+ hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
+ hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
+ hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
+ hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
+ hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
+ TxnDbUtil.setConfValues(hiveConf);
+ TxnDbUtil.cleanDb(hiveConf);
+ TxnDbUtil.prepDb(hiveConf);
+ conf = hiveConf;
+ // Use tez as execution engine for this test class
+ setupTez(conf);
+ msClient = new HiveMetaStoreClient(conf);
+ driver = DriverFactory.newDriver(conf);
+ SessionState.start(new CliSessionState(conf));
+ }
+
+ private void setupTez(HiveConf conf) {
+ conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
+ conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR);
+ conf.set("tez.am.resource.memory.mb", "128");
+ conf.set("tez.am.dag.scheduler.class",
+ "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled");
+ conf.setBoolean("tez.local.mode", true);
+ conf.set("fs.defaultFS", "file:///");
+ conf.setBoolean("tez.runtime.optimize.local.fetch", true);
+ conf.set("tez.staging-dir", TEST_DATA_DIR);
+ conf.setBoolean("tez.ignore.lib.uris", true);
+ conf.set("hive.tez.container.size", "128");
+ conf.setBoolean("hive.merge.tezfiles", false);
+ conf.setBoolean("hive.in.tez.test", true);
+ }
+
+ @After public void tearDown() {
+ if (msClient != null) {
+ msClient.close();
+ }
+ if (driver != null) {
+ driver.close();
+ }
+ conf = null;
+ }
+
+ protected class TestDataProvider {
+
+ void createFullAcidTable(String tblName, boolean isPartitioned, boolean isBucketed)
+ throws Exception {
+ createTable(tblName, isPartitioned, isBucketed, false, "orc");
+ }
+
+ void createMmTable(String tblName, boolean isPartitioned, boolean isBucketed)
+ throws Exception {
+ createMmTable(tblName, isPartitioned, isBucketed, "orc");
+ }
+
+ void createMmTable(String tblName, boolean isPartitioned, boolean isBucketed, String fileFormat)
+ throws Exception {
+ createTable(tblName, isPartitioned, isBucketed, true, fileFormat);
+ }
+
+ private void createTable(String tblName, boolean isPartitioned, boolean isBucketed,
+ boolean insertOnly, String fileFormat) throws Exception {
+
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ StringBuilder query = new StringBuilder();
+ query.append("create table ").append(tblName).append(" (a string, b int)");
+ if (isPartitioned) {
+ query.append(" partitioned by (ds string)");
+ }
+ if (isBucketed) {
+ query.append(" clustered by (a) into 2 buckets");
+ }
+ query.append(" stored as ").append(fileFormat);
+ query.append(" TBLPROPERTIES('transactional'='true',");
+ if (insertOnly) {
+ query.append(" 'transactional_properties'='insert_only')");
+ } else {
+ query.append(" 'transactional_properties'='default')");
+ }
+ executeStatementOnDriver(query.toString(), driver);
+ }
+
+ /**
+ * 5 txns.
+ */
+ void insertTestDataPartitioned(String tblName) throws Exception {
+ executeStatementOnDriver("insert into " + tblName
+ + " values('1',2, 'today'),('1',3, 'today'),('1',4, 'yesterday'),('2',2, 'tomorrow'),"
+ + "('2',3, 'yesterday'),('2',4, 'today')", driver);
+ executeStatementOnDriver("insert into " + tblName
+ + " values('3',2, 'tomorrow'),('3',3, 'today'),('3',4, 'yesterday'),('4',2, 'today'),"
+ + "('4',3, 'tomorrow'),('4',4, 'today')", driver);
+ executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
+ executeStatementOnDriver("insert into " + tblName + " values('5',2, 'yesterday'),('5',3, 'yesterday'),"
+ + "('5',4, 'today'),('6',2, 'today'),('6',3, 'today'),('6',4, 'today')", driver);
+ executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver);
+ }
+
+ /**
+ * 3 txns.
+ */
+ protected void insertMmTestDataPartitioned(String tblName) throws Exception {
+ executeStatementOnDriver("insert into " + tblName
+ + " values('1',2, 'today'),('1',3, 'today'),('1',4, 'yesterday'),('2',2, 'tomorrow'),"
+ + "('2',3, 'yesterday'),('2',4, 'today')", driver);
+ executeStatementOnDriver("insert into " + tblName
+ + " values('3',2, 'tomorrow'),('3',3, 'today'),('3',4, 'yesterday'),('4',2, 'today'),"
+ + "('4',3, 'tomorrow'),('4',4, 'today')", driver);
+ executeStatementOnDriver("insert into " + tblName + " values('5',2, 'yesterday'),('5',3, 'yesterday'),"
+ + "('5',4, 'today'),('6',2, 'today'),('6',3, 'today'),('6',4, 'today')", driver);
+ }
+
+ /**
+ * 5 txns.
+ */
+ protected void insertTestData(String tblName) throws Exception {
+ executeStatementOnDriver("insert into " + tblName + " values('1',2),('1',3),('1',4),('2',2),('2',3),('2',4)",
+ driver);
+ executeStatementOnDriver("insert into " + tblName + " values('3',2),('3',3),('3',4),('4',2),('4',3),('4',4)",
+ driver);
+ executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
+ executeStatementOnDriver("insert into " + tblName + " values('5',2),('5',3),('5',4),('6',2),('6',3),('6',4)",
+ driver);
+ executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver);
+ }
+
+ /**
+ * 3 txns.
+ */
+ protected void insertMmTestData(String tblName) throws Exception {
+ executeStatementOnDriver("insert into " + tblName + " values('1',2),('1',3),('1',4),('2',2),('2',3),('2',4)",
+ driver);
+ executeStatementOnDriver("insert into " + tblName + " values('3',2),('3',3),('3',4),('4',2),('4',3),('4',4)",
+ driver);
+ executeStatementOnDriver("insert into " + tblName + " values('5',2),('5',3),('5',4),('6',2),('6',3),('6',4)",
+ driver);
+ }
+
+ /**
+ * i * 1.5 txns.
+ */
+ protected void insertTestData(String tblName, int iterations) throws Exception {
+ for (int i = 0; i < iterations; i++) {
+ executeStatementOnDriver("insert into " + tblName + " values('" + i + "'," + i + ")", driver);
+ }
+ for (int i = 0; i < iterations; i += 2) {
+ executeStatementOnDriver("delete from " + tblName + " where b = " + i, driver);
+ }
+ }
+
+ /**
+ * i txns.
+ */
+ protected void insertMmTestData(String tblName, int iterations) throws Exception {
+ for (int i = 0; i < iterations; i++) {
+ executeStatementOnDriver("insert into " + tblName + " values('" + i + "'," + i + ")", driver);
+ }
+ }
+
+ protected List<String> getAllData(String tblName) throws Exception {
+ List<String> result = executeStatementOnDriverAndReturnResults("select * from " + tblName, driver);
+ Collections.sort(result);
+ return result;
+ }
+
+ protected List<String> getBucketData(String tblName, String bucketId) throws Exception {
+ return executeStatementOnDriverAndReturnResults(
+ "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order by ROW__ID", driver);
+ }
+
+ protected void dropTable(String tblName) throws Exception {
+ executeStatementOnDriver("drop table " + tblName, driver);
+ }
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index 4c01311..9659a3f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -17,21 +17,16 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -44,20 +39,14 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.DriverFactory;
-import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StrictDelimitedInputWriter;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
@@ -66,68 +55,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@SuppressWarnings("deprecation")
-public class TestCrudCompactorOnTez {
- private static final AtomicInteger salt = new AtomicInteger(new Random().nextInt());
- private static final String TEST_DATA_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator
- + TestCrudCompactorOnTez.class.getCanonicalName() + "-" + System.currentTimeMillis() + "_" + salt
- .getAndIncrement()).getPath().replaceAll("\\\\", "/");
- private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
- private HiveConf conf;
- private IMetaStoreClient msClient;
- private IDriver driver;
-
- @Before
- // Note: we create a new conf and driver object before every test
- public void setup() throws Exception {
- File f = new File(TEST_WAREHOUSE_DIR);
- if (f.exists()) {
- FileUtil.fullyDelete(f);
- }
- if (!(new File(TEST_WAREHOUSE_DIR).mkdirs())) {
- throw new RuntimeException("Could not create " + TEST_WAREHOUSE_DIR);
- }
- HiveConf hiveConf = new HiveConf(this.getClass());
- hiveConf.setVar(HiveConf.ConfVars.PREEXECHOOKS, "");
- hiveConf.setVar(HiveConf.ConfVars.POSTEXECHOOKS, "");
- hiveConf.setVar(HiveConf.ConfVars.METASTOREWAREHOUSE, TEST_WAREHOUSE_DIR);
- hiveConf.setVar(HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
- hiveConf.setVar(HiveConf.ConfVars.HIVEFETCHTASKCONVERSION, "none");
- TxnDbUtil.setConfValues(hiveConf);
- TxnDbUtil.cleanDb(hiveConf);
- TxnDbUtil.prepDb(hiveConf);
- conf = hiveConf;
- // Use tez as execution engine for this test class
- setupTez(conf);
- msClient = new HiveMetaStoreClient(conf);
- driver = DriverFactory.newDriver(conf);
- SessionState.start(new CliSessionState(conf));
- }
-
- private void setupTez(HiveConf conf) {
- conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "tez");
- conf.setVar(HiveConf.ConfVars.HIVE_USER_INSTALL_DIR, TEST_DATA_DIR);
- conf.set("tez.am.resource.memory.mb", "128");
- conf.set("tez.am.dag.scheduler.class", "org.apache.tez.dag.app.dag.impl.DAGSchedulerNaturalOrderControlled");
- conf.setBoolean("tez.local.mode", true);
- conf.set("fs.defaultFS", "file:///");
- conf.setBoolean("tez.runtime.optimize.local.fetch", true);
- conf.set("tez.staging-dir", TEST_DATA_DIR);
- conf.setBoolean("tez.ignore.lib.uris", true);
- conf.set("hive.tez.container.size", "128");
- conf.setBoolean("hive.merge.tezfiles", false);
- conf.setBoolean("hive.in.tez.test", true);
- }
-
- @After
- public void tearDown() {
- if (msClient != null) {
- msClient.close();
- }
- if (driver != null) {
- driver.close();
- }
- conf = null;
- }
+public class TestCrudCompactorOnTez extends CompactorOnTezTest {
@Test
public void testMajorCompaction() throws Exception {
@@ -217,7 +145,7 @@ public class TestCrudCompactorOnTez {
String tableName = "testMinorCompaction";
// Create test table
TestDataProvider dataProvider = new TestDataProvider();
- dataProvider.createTable(tableName, false, false);
+ dataProvider.createFullAcidTable(tableName, false, false);
// Find the location of the table
IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
Table table = msClient.getTable(dbName, tableName);
@@ -287,7 +215,7 @@ public class TestCrudCompactorOnTez {
String tableName = "testMinorCompaction";
// Create test table
TestDataProvider dataProvider = new TestDataProvider();
- dataProvider.createTable(tableName, false, true);
+ dataProvider.createFullAcidTable(tableName, false, true);
// Find the location of the table
IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
Table table = metaStoreClient.getTable(dbName, tableName);
@@ -360,7 +288,7 @@ public class TestCrudCompactorOnTez {
String tableName = "testMinorCompaction";
// Create test table
TestDataProvider dataProvider = new TestDataProvider();
- dataProvider.createTable(tableName, true, false);
+ dataProvider.createFullAcidTable(tableName, true, false);
// Find the location of the table
IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
Table table = metaStoreClient.getTable(dbName, tableName);
@@ -440,7 +368,7 @@ public class TestCrudCompactorOnTez {
String tableName = "testMinorCompaction";
// Create test table
TestDataProvider dataProvider = new TestDataProvider();
- dataProvider.createTable(tableName, true, true);
+ dataProvider.createFullAcidTable(tableName, true, true);
// Find the location of the table
IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
Table table = metaStoreClient.getTable(dbName, tableName);
@@ -522,7 +450,7 @@ public class TestCrudCompactorOnTez {
String tableName = "testMinorCompaction";
// Create test table
TestDataProvider dataProvider = new TestDataProvider();
- dataProvider.createTable(tableName, false, false);
+ dataProvider.createFullAcidTable(tableName, false, false);
// Find the location of the table
IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
Table table = metaStoreClient.getTable(dbName, tableName);
@@ -577,7 +505,7 @@ public class TestCrudCompactorOnTez {
String tableName = "testMinorCompaction";
// Create test table
TestDataProvider dataProvider = new TestDataProvider();
- dataProvider.createTable(tableName, false, true);
+ dataProvider.createFullAcidTable(tableName, false, true);
// Find the location of the table
IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
Table table = metaStoreClient.getTable(dbName, tableName);
@@ -757,7 +685,7 @@ public class TestCrudCompactorOnTez {
String tableName = "testMinorCompaction";
// Create test table
TestDataProvider dataProvider = new TestDataProvider();
- dataProvider.createTable(tableName, false, false);
+ dataProvider.createFullAcidTable(tableName, false, false);
// Find the location of the table
IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
Table table = metaStoreClient.getTable(dbName, tableName);
@@ -786,6 +714,10 @@ public class TestCrudCompactorOnTez {
// Verify all contents
List<String> actualData = dataProvider.getAllData(tableName);
Assert.assertEquals(expectedData, actualData);
+ // Insert another round of test data
+ dataProvider.insertTestData(tableName);
+ expectedData = dataProvider.getAllData(tableName);
+ Collections.sort(expectedData);
// Run a compaction
CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true);
// Clean up resources
@@ -796,7 +728,7 @@ public class TestCrudCompactorOnTez {
Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(1).getState());
// Verify base directory after compaction
Assert.assertEquals("Base directory does not match after major compaction",
- Collections.singletonList("base_0000005_v0000023"),
+ Collections.singletonList("base_0000010_v0000029"),
CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null));
// Verify all contents
actualData = dataProvider.getAllData(tableName);
@@ -804,6 +736,62 @@ public class TestCrudCompactorOnTez {
}
@Test
+ public void testMinorCompactionAfterMajor() throws Exception {
+ String dbName = "default";
+ String tableName = "testMinorCompaction";
+ // Create test table
+ TestDataProvider dataProvider = new TestDataProvider();
+ dataProvider.createFullAcidTable(tableName, false, false);
+ // Find the location of the table
+ IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+ Table table = metaStoreClient.getTable(dbName, tableName);
+ FileSystem fs = FileSystem.get(conf);
+ // Insert test data into test table
+ dataProvider.insertTestData(tableName);
+ // Get all data before compaction is run
+ List<String> expectedData = dataProvider.getAllData(tableName);
+ Collections.sort(expectedData);
+ // Run a compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true);
+ // Clean up resources
+ CompactorTestUtil.runCleaner(conf);
+ // Only 1 compaction should be in the response queue with succeeded state
+ List<ShowCompactResponseElement> compacts =
+ TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+ Assert.assertEquals("Completed compaction queue must contain one element", 1, compacts.size());
+ Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(0).getState());
+ // Verify base directory after compaction
+ Assert.assertEquals("Base directory does not match after major compaction",
+ Collections.singletonList("base_0000005_v0000009"),
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null));
+ // Verify all contents
+ List<String> actualData = dataProvider.getAllData(tableName);
+ Assert.assertEquals(expectedData, actualData);
+ // Insert another round of test data
+ dataProvider.insertTestData(tableName);
+ expectedData = dataProvider.getAllData(tableName);
+ Collections.sort(expectedData);
+ // Run a compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+ // Clean up resources
+ CompactorTestUtil.runCleaner(conf);
+ // 2 compaction should be in the response queue with succeeded state
+ compacts = TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+ Assert.assertEquals("Completed compaction queue must contain one element", 2, compacts.size());
+ Assert.assertEquals("Compaction state is not succeeded", "succeeded", compacts.get(1).getState());
+ // Verify base directory after compaction
+ Assert.assertEquals("Base directory does not match after major compaction",
+ Collections.singletonList("base_0000005_v0000009"),
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null));
+ Assert.assertEquals("Delta directories do not match after major compaction",
+ Collections.singletonList("delta_0000001_0000010_v0000020"),
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+ // Verify all contents
+ actualData = dataProvider.getAllData(tableName);
+ Assert.assertEquals(expectedData, actualData);
+ }
+
+ @Test
public void testMinorCompactionWhileStreamingWithSplitUpdate() throws Exception {
String dbName = "default";
String tableName = "testMinorCompaction";
@@ -967,71 +955,5 @@ public class TestCrudCompactorOnTez {
hiveConf.setVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT, "none");
qc.runCompactionQueries(hiveConf, null, sdMock, null, null, emptyQueries, emptyQueries, emptyQueries);
Assert.assertEquals("none", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT));
-
- }
-
- private class TestDataProvider {
-
- private void createTable(String tblName, boolean isPartitioned, boolean isBucketed) throws Exception {
- executeStatementOnDriver("drop table if exists " + tblName, driver);
- StringBuilder query = new StringBuilder();
- query.append("create table ").append(tblName).append(" (a string, b int)");
- if (isPartitioned) {
- query.append(" partitioned by (ds string)");
- }
- if (isBucketed) {
- query.append(" clustered by (a) into 2 buckets");
- }
- query.append(" stored as ORC TBLPROPERTIES('transactional'='true'," + " 'transactional_properties'='default')");
- executeStatementOnDriver(query.toString(), driver);
- }
-
- private void insertTestDataPartitioned(String tblName) throws Exception {
- executeStatementOnDriver("insert into " + tblName
- + " values('1',2, 'today'),('1',3, 'today'),('1',4, 'yesterday'),('2',2, 'tomorrow'),"
- + "('2',3, 'yesterday'),('2',4, 'today')", driver);
- executeStatementOnDriver("insert into " + tblName
- + " values('3',2, 'tomorrow'),('3',3, 'today'),('3',4, 'yesterday'),('4',2, 'today'),"
- + "('4',3, 'tomorrow'),('4',4, 'today')", driver);
- executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
- executeStatementOnDriver("insert into " + tblName + " values('5',2, 'yesterday'),('5',3, 'yesterday'),"
- + "('5',4, 'today'),('6',2, 'today'),('6',3, 'today'),('6',4, 'today')", driver);
- executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver);
- }
-
- private void insertTestData(String tblName) throws Exception {
- executeStatementOnDriver("insert into " + tblName + " values('1',2),('1',3),('1',4),('2',2),('2',3),('2',4)",
- driver);
- executeStatementOnDriver("insert into " + tblName + " values('3',2),('3',3),('3',4),('4',2),('4',3),('4',4)",
- driver);
- executeStatementOnDriver("delete from " + tblName + " where b = 2", driver);
- executeStatementOnDriver("insert into " + tblName + " values('5',2),('5',3),('5',4),('6',2),('6',3),('6',4)",
- driver);
- executeStatementOnDriver("delete from " + tblName + " where a = '1'", driver);
- }
-
- private void insertTestData(String tblName, int iterations) throws Exception {
- for (int i = 0; i < iterations; i++) {
- executeStatementOnDriver("insert into " + tblName + " values('" + i + "'," + i + ")", driver);
- }
- for (int i = 0; i < iterations; i += 2) {
- executeStatementOnDriver("delete from " + tblName + " where b = " + i, driver);
- }
- }
-
- private List<String> getAllData(String tblName) throws Exception {
- List<String> result = executeStatementOnDriverAndReturnResults("select * from " + tblName, driver);
- Collections.sort(result);
- return result;
- }
-
- private List<String> getBucketData(String tblName, String bucketId) throws Exception {
- return executeStatementOnDriverAndReturnResults(
- "select ROW__ID, * from " + tblName + " where ROW__ID.bucketid = " + bucketId + " order by ROW__ID", driver);
- }
-
- private void dropTable(String tblName) throws Exception {
- executeStatementOnDriver("drop table " + tblName, driver);
- }
}
}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
new file mode 100644
index 0000000..074430c
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
+
+/**
+ * Test functionality of MmMinorQueryCompactor,.
+ */
+public class TestMmCompactorOnTez extends CompactorOnTezTest {
+
+ @Test public void testMmMinorCompactionNotPartitionedWithoutBuckets() throws Exception {
+ String dbName = "default";
+ String tableName = "testMmMinorCompaction";
+ // Create test table
+ TestDataProvider testDataProvider = new TestCrudCompactorOnTez.TestDataProvider();
+ testDataProvider.createMmTable(tableName, false, false);
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tableName);
+ FileSystem fs = FileSystem.get(conf);
+ // Insert test data into test table
+ testDataProvider.insertMmTestData(tableName);
+ // Get all data before compaction is run
+ List<String> expectedData = testDataProvider.getAllData(tableName);
+ // Verify deltas
+ Assert.assertEquals("Delta directories does not match", Arrays
+ .asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000",
+ "delta_0000003_0000003_0000"),
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+ // Run a compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+ CompactorTestUtil.runCleaner(conf);
+ verifySuccessulTxn(1);
+ // Verify delta directories after compaction
+ List<String> actualDeltasAfterComp =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+ Assert.assertEquals("Delta directories does not match after compaction",
+ Collections.singletonList("delta_0000001_0000003_v0000007"), actualDeltasAfterComp);
+ // Verify bucket files in delta dirs
+ List<String> expectedBucketFiles = Collections.singletonList("000000_0");
+ Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+ CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+ verifyAllContents(tableName, testDataProvider, expectedData);
+ // Clean up
+ testDataProvider.dropTable(tableName);
+ }
+
+ @Test public void testMmMinorCompactionNotPartitionedWithBuckets() throws Exception {
+ String dbName = "default";
+ String tableName = "testMmMinorCompaction";
+ // expected name of the delta dir that will be created with minor compaction
+ String newDeltaName = "delta_0000001_0000003_v0000007";
+ // Create test table
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createMmTable(tableName, false, true);
+ // Find the location of the table
+ IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+ Table table = metaStoreClient.getTable(dbName, tableName);
+ FileSystem fs = FileSystem.get(conf);
+ // Insert test data into test table
+ testDataProvider.insertMmTestData(tableName);
+ // Get all data before compaction is run
+ List<String> expectedData = testDataProvider.getAllData(tableName);
+ // Verify deltas
+ Assert.assertEquals("Delta directories does not match", Arrays
+ .asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000",
+ "delta_0000003_0000003_0000"),
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+ // Run a compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+ CompactorTestUtil.runCleaner(conf);
+ verifySuccessulTxn(1);
+ // Verify delta directories after compaction
+ List<String> actualDeltasAfterComp =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+ Assert.assertEquals("Delta directories does not match after compaction",
+ Collections.singletonList(newDeltaName), actualDeltasAfterComp);
+ // Verify number of files in directory
+ FileStatus[] files = fs.listStatus(new Path(table.getSd().getLocation(), newDeltaName),
+ AcidUtils.hiddenFileFilter);
+ Assert.assertEquals("Incorrect number of bucket files", 2, files.length);
+ // Verify bucket files in delta dirs
+ List<String> expectedBucketFiles = Arrays.asList("000000_0", "000001_0");
+ Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+ CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+ verifyAllContents(tableName, testDataProvider, expectedData);
+ // Clean up
+ testDataProvider.dropTable(tableName);
+ }
+
+ @Test public void testMmMinorCompactionPartitionedWithoutBuckets() throws Exception {
+ String dbName = "default";
+ String tableName = "testMmMinorCompaction";
+ // Create test table
+ TestDataProvider dataProvider = new TestDataProvider();
+ dataProvider.createMmTable(tableName, true, false);
+ // Find the location of the table
+ IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+ Table table = metaStoreClient.getTable(dbName, tableName);
+ FileSystem fs = FileSystem.get(conf);
+ // Insert test data into test table
+ dataProvider.insertMmTestDataPartitioned(tableName);
+ // Get all data before compaction is run
+ List<String> expectedData = dataProvider.getAllData(tableName);
+ // Verify deltas
+ String partitionToday = "ds=today";
+ String partitionTomorrow = "ds=tomorrow";
+ String partitionYesterday = "ds=yesterday";
+ Assert.assertEquals("Delta directories does not match", Arrays
+ .asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000",
+ "delta_0000003_0000003_0000"), CompactorTestUtil
+ .getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday));
+ // Run a compaction
+ CompactorTestUtil
+ .runCompaction(conf, dbName, tableName, CompactionType.MINOR, true, partitionToday,
+ partitionTomorrow, partitionYesterday);
+ CompactorTestUtil.runCleaner(conf);
+ verifySuccessulTxn(3);
+ // Verify delta directories after compaction in each partition
+ List<String> actualDeltasAfterCompPartToday =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday);
+ Assert.assertEquals("Delta directories does not match after compaction",
+ Collections.singletonList("delta_0000001_0000003_v0000007"),
+ actualDeltasAfterCompPartToday);
+ // Verify bucket files in delta dirs
+ List<String> expectedBucketFiles = Collections.singletonList("000000_0");
+ Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+ CompactorTestUtil
+ .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0)));
+ verifyAllContents(tableName, dataProvider, expectedData);
+ // Clean up
+ dataProvider.dropTable(tableName);
+ }
+
+ @Test public void testMmMinorCompactionPartitionedWithBucketsOrc() throws Exception {
+ testMmMinorCompactionPartitionedWithBuckets("orc");
+ }
+
+ @Test public void testMmMinorCompactionPartitionedWithBucketsParquet() throws Exception {
+ testMmMinorCompactionPartitionedWithBuckets("parquet");
+ }
+
+ @Test public void testMmMinorCompactionPartitionedWithBucketsAvro() throws Exception {
+ testMmMinorCompactionPartitionedWithBuckets("avro");
+ }
+
+ @Test public void testMmMinorCompactionPartitionedWithBucketsTextFile() throws Exception {
+ testMmMinorCompactionPartitionedWithBuckets("textfile");
+ }
+
+ @Test public void testMmMinorCompactionPartitionedWithBucketsSequenceFile() throws Exception {
+ testMmMinorCompactionPartitionedWithBuckets("sequencefile");
+ }
+
+ @Test public void testMmMinorCompactionPartitionedWithBucketsRcFile() throws Exception {
+ testMmMinorCompactionPartitionedWithBuckets("RcFile");
+ }
+
+ @Test public void testMmMinorCompactionPartitionedWithBucketsJsonFile() throws Exception {
+ testMmMinorCompactionPartitionedWithBuckets("JsonFile");
+ }
+
+ private void testMmMinorCompactionPartitionedWithBuckets(String fileFormat) throws Exception {
+ String dbName = "default";
+ String tableName = "testMmMinorCompaction";
+ // Create test table
+ TestDataProvider dataProvider = new TestDataProvider();
+ dataProvider.createMmTable(tableName, true, true, fileFormat);
+ // Find the location of the table
+ IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+ Table table = metaStoreClient.getTable(dbName, tableName);
+ FileSystem fs = FileSystem.get(conf);
+ // Insert test data into test table
+ dataProvider.insertMmTestDataPartitioned(tableName);
+ // Get all data before compaction is run
+ List<String> expectedData = dataProvider.getAllData(tableName);
+ // Verify deltas
+ String partitionToday = "ds=today";
+ String partitionTomorrow = "ds=tomorrow";
+ String partitionYesterday = "ds=yesterday";
+ Assert.assertEquals("Delta directories does not match", Arrays
+ .asList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000",
+ "delta_0000003_0000003_0000"), CompactorTestUtil
+ .getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday));
+ // Run a compaction
+ CompactorTestUtil
+ .runCompaction(conf, dbName, tableName, CompactionType.MINOR, true, partitionToday,
+ partitionTomorrow, partitionYesterday);
+ CompactorTestUtil.runCleaner(conf);
+ verifySuccessulTxn(3);
+ // Verify delta directories after compaction in each partition
+ List<String> actualDeltasAfterCompPartToday =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, partitionToday);
+ Assert.assertEquals("Delta directories does not match after compaction",
+ Collections.singletonList("delta_0000001_0000003_v0000007"),
+ actualDeltasAfterCompPartToday);
+ // Verify bucket files in delta dirs
+ List<String> expectedBucketFiles = Arrays.asList("000000_0", "000001_0");
+ Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFiles,
+ CompactorTestUtil
+ .getBucketFileNames(fs, table, partitionToday, actualDeltasAfterCompPartToday.get(0)));
+ verifyAllContents(tableName, dataProvider, expectedData);
+ // Clean up
+ dataProvider.dropTable(tableName);
+ }
+
+ @Test public void testMmMinorCompaction10DeltaDirs() throws Exception {
+ String dbName = "default";
+ String tableName = "testMmMinorCompaction";
+ // Create test table
+ TestDataProvider dataProvider = new TestDataProvider();
+ dataProvider.createMmTable(tableName, false, false);
+ // Find the location of the table
+ IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+ Table table = metaStoreClient.getTable(dbName, tableName);
+ FileSystem fs = FileSystem.get(conf);
+ // Insert test data into test table
+ dataProvider.insertMmTestData(tableName, 10);
+ // Get all data before compaction is run
+ List<String> expectedData = dataProvider.getAllData(tableName);
+ Collections.sort(expectedData);
+ // Verify deltas
+ List<String> deltaNames =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+ Assert.assertEquals(10, deltaNames.size());
+ // Run a compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+ CompactorTestUtil.runCleaner(conf);
+ List<ShowCompactResponseElement> compacts =
+ TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+ Assert.assertEquals("Completed compaction queue must contain 3 element", 1, compacts.size());
+ compacts.forEach(
+ c -> Assert.assertEquals("Compaction state is not succeeded", "succeeded", c.getState()));
+ // Verify delta directories after compaction
+ List<String> actualDeltasAfterComp =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+ Assert.assertEquals(Collections.singletonList("delta_0000001_0000010_v0000014"),
+ actualDeltasAfterComp);
+ // Verify bucket file in delta dir
+ List<String> expectedBucketFile = Collections.singletonList("000000_0");
+ Assert.assertEquals("Bucket names are not matching after compaction", expectedBucketFile,
+ CompactorTestUtil.getBucketFileNames(fs, table, null, actualDeltasAfterComp.get(0)));
+ verifyAllContents(tableName, dataProvider, expectedData);
+ // Clean up
+ dataProvider.dropTable(tableName);
+ }
+
+ @Test public void testMultipleMmMinorCompactions() throws Exception {
+ String dbName = "default";
+ String tableName = "testMmMinorCompaction";
+ // Create test table
+ TestDataProvider dataProvider = new TestDataProvider();
+ dataProvider.createMmTable(tableName, false, true);
+ // Find the location of the table
+ IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+ Table table = metaStoreClient.getTable(dbName, tableName);
+ FileSystem fs = FileSystem.get(conf);
+ // Insert test data into test table
+ dataProvider.insertMmTestData(tableName);
+ // Run a compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+ CompactorTestUtil.runCleaner(conf);
+ verifySuccessulTxn(1);
+ List<ShowCompactResponseElement> compacts;
+ // Insert test data into test table
+ dataProvider.insertMmTestData(tableName);
+ // Run a compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+ CompactorTestUtil.runCleaner(conf);
+ verifySuccessulTxn(2);
+ // Insert test data into test table
+ dataProvider.insertMmTestData(tableName);
+ // Run a compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+ CompactorTestUtil.runCleaner(conf);
+ verifySuccessulTxn(3);
+ // Verify delta directories after compaction
+ List<String> actualDeltasAfterComp =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+ Assert.assertEquals("Delta directories does not match after compaction",
+ Collections.singletonList("delta_0000001_0000009_v0000026"), actualDeltasAfterComp);
+
+ }
+
+ @Test public void testMmMajorCompactionAfterMinor() throws Exception {
+ String dbName = "default";
+ String tableName = "testMmMinorCompaction";
+ // Create test table
+ TestDataProvider dataProvider = new TestDataProvider();
+ dataProvider.createMmTable(tableName, false, false);
+ // Find the location of the table
+ IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+ Table table = metaStoreClient.getTable(dbName, tableName);
+ FileSystem fs = FileSystem.get(conf);
+ // Insert test data into test table
+ dataProvider.insertMmTestData(tableName);
+ // Get all data before compaction is run
+ List<String> expectedData = dataProvider.getAllData(tableName);
+ Collections.sort(expectedData);
+ // Run a compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+ CompactorTestUtil.runCleaner(conf);
+ verifySuccessulTxn(1);
+ List<ShowCompactResponseElement> compacts;
+ // Verify delta directories after compaction
+ Assert.assertEquals("Delta directories does not match after minor compaction",
+ Collections.singletonList("delta_0000001_0000003_v0000007"),
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+ verifyAllContents(tableName, dataProvider, expectedData);
+ List<String> actualData;
+ // Insert a second round of test data into test table; update expectedData
+ dataProvider.insertMmTestData(tableName);
+ expectedData = dataProvider.getAllData(tableName);
+ // Run a compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true);
+ CompactorTestUtil.runCleaner(conf);
+ verifySuccessulTxn(2);
+ // Verify base directory after compaction
+ Assert.assertEquals("Base directory does not match after major compaction",
+ Collections.singletonList("base_0000006_v0000019"),
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null));
+ actualData = dataProvider.getAllData(tableName);
+ Assert.assertEquals(expectedData, actualData);
+ }
+
+ @Test public void testMmMinorCompactionAfterMajor() throws Exception {
+ String dbName = "default";
+ String tableName = "testMmMinorCompaction";
+ // Create test table
+ TestDataProvider dataProvider = new TestDataProvider();
+ dataProvider.createMmTable(tableName, false, false);
+ // Find the location of the table
+ IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+ Table table = metaStoreClient.getTable(dbName, tableName);
+ FileSystem fs = FileSystem.get(conf);
+ // Insert test data into test table
+ dataProvider.insertMmTestData(tableName);
+ // Get all data before compaction is run
+ List<String> expectedData = dataProvider.getAllData(tableName);
+ Collections.sort(expectedData);
+ // Run a major compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true);
+ CompactorTestUtil.runCleaner(conf);
+ verifySuccessulTxn(1);
+ List<ShowCompactResponseElement> compacts;
+ // Verify base directory after compaction
+ Assert.assertEquals("Base directory does not match after major compaction",
+ Collections.singletonList("base_0000003_v0000007"),
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null));
+ verifyAllContents(tableName, dataProvider, expectedData);
+ // Insert test data into test table
+ dataProvider.insertMmTestData(tableName);
+ expectedData = dataProvider.getAllData(tableName);
+ Collections.sort(expectedData);
+ // Run a compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+ CompactorTestUtil.runCleaner(conf);
+ verifySuccessulTxn(2);
+ // Verify base/delta directories after compaction
+ Assert.assertEquals("Base directory does not match after major compaction",
+ Collections.singletonList("base_0000003_v0000007"),
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.baseFileFilter, table, null));
+ Assert.assertEquals("Delta directories does not match after minor compaction",
+ Collections.singletonList("delta_0000001_0000006_v0000016"),
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+ verifyAllContents(tableName, dataProvider, expectedData);
+ }
+
+ @Test public void testMmMinorCompactionWithSchemaEvolutionAndBuckets() throws Exception {
+ String dbName = "default";
+ String tblName = "testMmMinorCompactionWithSchemaEvolutionAndBuckets";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver("create transactional table " + tblName
+ + " (a int, b int) partitioned by(ds string) clustered by (a) into 2 buckets"
+ + " stored as ORC TBLPROPERTIES('bucketing_version'='2', 'transactional'='true',"
+ + " 'transactional_properties'='insert_only')", driver);
+ // Insert some data
+ executeStatementOnDriver("insert into " + tblName
+ + " partition (ds) values"
+ + "(1,2,'today'),(1,3,'today'),(1,4,'yesterday'),"
+ + "(2,2,'yesterday'),(2,3,'today'),(2,4,'today')",
+ driver);
+ // Add a new column
+ executeStatementOnDriver("alter table " + tblName + " add columns(c int)", driver);
+ // TODO uncomment this line after HIVE-22826 fixed:
+ // executeStatementOnDriver("alter table " + tblName + " change column a aa int", driver);
+ // Insert more data
+ executeStatementOnDriver("insert into " + tblName
+ + " partition (ds) values"
+ + "(3,2,1000,'yesterday'),(3,3,1001,'today'),"
+ + "(3,4,1002,'yesterday'),(4,2,1003,'today'),"
+ + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver);
+ // Get all data before compaction is run
+ TestDataProvider dataProvider = new TestDataProvider();
+ List<String> expectedData = dataProvider.getAllData(tblName);
+ Collections.sort(expectedData);
+ // Run minor compaction and cleaner
+ CompactorTestUtil
+ .runCompaction(conf, dbName, tblName, CompactionType.MINOR, true, "ds=yesterday",
+ "ds=today");
+ CompactorTestUtil.runCleaner(conf);
+ verifySuccessulTxn(2);
+ verifyAllContents(tblName, dataProvider, expectedData);
+ // Clean up
+ executeStatementOnDriver("drop table " + tblName, driver);
+ }
+
+ @Test public void testMmMinorCompactionWithSchemaEvolutionNoBucketsMultipleReducers()
+ throws Exception {
+ HiveConf hiveConf = new HiveConf(conf);
+ hiveConf.setIntVar(HiveConf.ConfVars.MAXREDUCERS, 2);
+ hiveConf.setIntVar(HiveConf.ConfVars.HADOOPNUMREDUCERS, 2);
+ driver = DriverFactory.newDriver(hiveConf);
+ String dbName = "default";
+ String tblName = "testMmMinorCompactionWithSchemaEvolutionNoBucketsMultipleReducers";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver(
+ "create transactional table " + tblName + " (a int, b int) partitioned by(ds string)"
+ + " stored as ORC TBLPROPERTIES('transactional'='true',"
+ + " 'transactional_properties'='insert_only')", driver);
+ // Insert some data
+ executeStatementOnDriver("insert into " + tblName + " partition (ds) values"
+ + "(1,2,'today'),(1,3,'today'),"
+ + "(1,4,'yesterday'),(2,2,'yesterday'),"
+ + "(2,3,'today'),(2,4,'today')",
+ driver);
+ // Add a new column
+ executeStatementOnDriver("alter table " + tblName + " add columns(c int)", driver);
+ // Insert more data
+ executeStatementOnDriver("insert into " + tblName
+ + " partition (ds) values(3,2,1000,'yesterday'),(3,3,1001,'today'),(3,4,1002,'yesterday'),(4,2,1003,'today'),"
+ + "(4,3,1004,'yesterday'),(4,4,1005,'today')", driver);
+ // Get all data before compaction is run
+ TestDataProvider dataProvider = new TestDataProvider();
+ List<String> expectedData = dataProvider.getAllData(tblName);
+ Collections.sort(expectedData);
+ // Run minor compaction and cleaner
+ CompactorTestUtil
+ .runCompaction(conf, dbName, tblName, CompactionType.MINOR, true, "ds=yesterday",
+ "ds=today");
+ CompactorTestUtil.runCleaner(hiveConf);
+ verifySuccessulTxn(2);
+
+ verifyAllContents(tblName, dataProvider, expectedData);
+ // Clean up
+ executeStatementOnDriver("drop table " + tblName, driver);
+ }
+
+ @Test public void testMinorMmCompactionRemovesAbortedDirs()
+ throws Exception { // see mmTableOpenWriteId
+ String dbName = "default";
+ String tableName = "testMmMinorCompaction";
+ // Create test table
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createMmTable(tableName, false, false);
+ // Find the location of the table
+ IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
+ Table table = metaStoreClient.getTable(dbName, tableName);
+ FileSystem fs = FileSystem.get(conf);
+ // Insert test data into test table
+ testDataProvider.insertMmTestData(tableName);
+ // Get all data before compaction is run. Expected data is 2 x MmTestData insertion
+ List<String> expectedData = new ArrayList<>();
+ List<String> oneMmTestDataInsertion = testDataProvider.getAllData(tableName);
+ expectedData.addAll(oneMmTestDataInsertion);
+ expectedData.addAll(oneMmTestDataInsertion);
+ Collections.sort(expectedData);
+ // Insert an aborted directory (txns 4-6)
+ rollbackAllTxns(true, driver);
+ testDataProvider.insertMmTestData(tableName);
+ rollbackAllTxns(false, driver);
+ // Check that delta dirs 4-6 exist
+ List<String> actualDeltasAfterComp =
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
+ Assert.assertEquals(Lists
+ .newArrayList("delta_0000001_0000001_0000", "delta_0000002_0000002_0000",
+ "delta_0000003_0000003_0000", "delta_0000004_0000004_0000",
+ "delta_0000005_0000005_0000", "delta_0000006_0000006_0000"), actualDeltasAfterComp);
+ // Insert another round of test data (txns 7-9)
+ testDataProvider.insertMmTestData(tableName);
+ verifyAllContents(tableName, testDataProvider, expectedData);
+ // Run a minor compaction
+ CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
+ CompactorTestUtil.runCleaner(conf);
+ verifySuccessulTxn(1);
+ // Verify delta directories after compaction
+ Assert.assertEquals("Delta directories does not match after minor compaction",
+ Collections.singletonList("delta_0000001_0000009_v0000014"),
+ CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null));
+ verifyAllContents(tableName, testDataProvider, expectedData);
+ }
+
+ /**
+ * Verify that the expected number of transactions have run, and their state is "succeeded".
+ *
+ * @param expectedCompleteCompacts number of compactions already run
+ * @throws MetaException
+ */
+ private void verifySuccessulTxn(int expectedCompleteCompacts) throws MetaException {
+ List<ShowCompactResponseElement> compacts =
+ TxnUtils.getTxnStore(conf).showCompact(new ShowCompactRequest()).getCompacts();
+ Assert.assertEquals("Completed compaction queue must contain one element",
+ expectedCompleteCompacts, compacts.size());
+ compacts.forEach(
+ c -> Assert.assertEquals("Compaction state is not succeeded", "succeeded", c.getState()));
+ }
+
+ /**
+ * Results of a select on the table results in the same data as expectedData.
+ */
+ private void verifyAllContents(String tblName, TestDataProvider dataProvider,
+ List<String> expectedData) throws Exception {
+ List<String> actualData = dataProvider.getAllData(tblName);
+ Collections.sort(actualData);
+ Assert.assertEquals(expectedData, actualData);
+ }
+
+ /**
+ * Set to true to cause all transactions to be rolled back, until set back to false.
+ */
+ private static void rollbackAllTxns(boolean val, IDriver driver) {
+ driver.getConf().setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, val);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index bb70db4..739f2b6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -394,21 +394,6 @@ public class CompactorMR {
HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
}
- // Remove the directories for aborted transactions only
- private void removeFilesForMmTable(HiveConf conf, Directory dir) throws IOException {
- // For MM table, we only want to delete delta dirs for aborted txns.
- List<Path> filesToDelete = dir.getAbortedDirectories();
- if (filesToDelete.size() < 1) {
- return;
- }
- LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
- FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
- for (Path dead : filesToDelete) {
- LOG.debug("Going to delete path " + dead.toString());
- fs.delete(dead, true);
- }
- }
-
public JobConf getMrJob() {
return mrJob;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index bad5d00..48387c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -20,37 +20,23 @@ package org.apache.hadoop.hive.ql.txn.compactor;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.util.DirectionUtils;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.HashSet;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
/**
* Class responsible to run query based major compaction on insert only tables.
@@ -66,14 +52,7 @@ final class MmMajorQueryCompactor extends QueryCompactor {
AcidUtils.Directory dir = AcidUtils
.getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
table.getParameters(), false);
- removeFilesForMmTable(hiveConf, dir);
-
- // Then, actually do the compaction.
- if (!compactionInfo.isMajorCompaction()) {
- // Not supported for MM tables right now.
- LOG.info("Not compacting " + storageDescriptor.getLocation() + "; not a major compaction");
- return;
- }
+ MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir);
if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) {
return;
@@ -129,107 +108,10 @@ final class MmMajorQueryCompactor extends QueryCompactor {
fs.delete(fromPath, true);
}
- // Remove the directories for aborted transactions only
- private void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException {
- // For MM table, we only want to delete delta dirs for aborted txns.
- List<Path> filesToDelete = dir.getAbortedDirectories();
- if (filesToDelete.size() < 1) {
- return;
- }
- LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
- FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
- for (Path dead : filesToDelete) {
- LOG.debug("Going to delete path " + dead.toString());
- fs.delete(dead, true);
- }
- }
-
- private List<String> getCreateQueries(String fullName, Table t, StorageDescriptor sd, String location) {
- StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append("(");
- List<FieldSchema> cols = t.getSd().getCols();
- boolean isFirst = true;
- for (FieldSchema col : cols) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append("`").append(col.getName()).append("` ").append(col.getType());
- }
- query.append(") ");
-
- // Bucketing.
- List<String> buckCols = t.getSd().getBucketCols();
- if (buckCols.size() > 0) {
- query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
- List<Order> sortCols = t.getSd().getSortCols();
- if (sortCols.size() > 0) {
- query.append("SORTED BY (");
- isFirst = true;
- for (Order sortCol : sortCols) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder()));
- }
- query.append(") ");
- }
- query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS");
- }
-
- // Stored as directories. We don't care about the skew otherwise.
- if (t.getSd().isStoredAsSubDirectories()) {
- SkewedInfo skewedInfo = t.getSd().getSkewedInfo();
- if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
- query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
- isFirst = true;
- for (List<String> colValues : skewedInfo.getSkewedColValues()) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append("('").append(StringUtils.join("','", colValues)).append("')");
- }
- query.append(") STORED AS DIRECTORIES");
- }
- }
-
- SerDeInfo serdeInfo = sd.getSerdeInfo();
- Map<String, String> serdeParams = serdeInfo.getParameters();
- query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib()))
- .append("'");
- String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
- assert sh == null; // Not supposed to be a compactable table.
- if (!serdeParams.isEmpty()) {
- ShowCreateTableOperation.appendSerdeParams(query, serdeParams);
- }
- query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat()))
- .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat()))
- .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
- // Exclude all standard table properties.
- Set<String> excludes = getHiveMetastoreConstants();
- excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
- isFirst = true;
- for (Map.Entry<String, String> e : t.getParameters().entrySet()) {
- if (e.getValue() == null) {
- continue;
- }
- if (excludes.contains(e.getKey())) {
- continue;
- }
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue()))
- .append("'");
- }
- if (!isFirst) {
- query.append(", ");
- }
- query.append("'transactional'='false')");
- return Lists.newArrayList(query.toString());
-
+ private List<String> getCreateQueries(String tmpTableName, Table table,
+ StorageDescriptor storageDescriptor, String baseLocation) {
+ return Lists.newArrayList(MmQueryCompactorUtils
+ .getCreateQuery(tmpTableName, table, storageDescriptor, baseLocation, false, false));
}
private List<String> getCompactionQueries(Table t, Partition p, String tmpName) {
@@ -263,28 +145,7 @@ final class MmMajorQueryCompactor extends QueryCompactor {
}
private List<String> getDropQueries(String tmpTableName) {
- return Lists.newArrayList("drop table if exists " + tmpTableName);
+ return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableName);
}
- private static Set<String> getHiveMetastoreConstants() {
- Set<String> result = new HashSet<>();
- for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
- if (!Modifier.isStatic(f.getModifiers())) {
- continue;
- }
- if (!Modifier.isFinal(f.getModifiers())) {
- continue;
- }
- if (!String.class.equals(f.getType())) {
- continue;
- }
- f.setAccessible(true);
- try {
- result.add((String) f.get(null));
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
- return result;
- }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
new file mode 100644
index 0000000..feb667c
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Run a minor query compaction on an insert only (MM) table.
+ */
+final class MmMinorQueryCompactor extends QueryCompactor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MmMinorQueryCompactor.class.getName());
+
+ @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition,
+ StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo)
+ throws IOException {
+ LOG.debug(
+ "Going to delete directories for aborted transactions for MM table " + table.getDbName()
+ + "." + table.getTableName());
+
+ AcidUtils.Directory dir = AcidUtils
+ .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds,
+ Ref.from(false), false, table.getParameters(), false);
+ MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir);
+ String tmpLocation = Util.generateTmpPath(storageDescriptor);
+ Path sourceTabLocation = new Path(tmpLocation);
+ Path resultTabLocation = new Path(tmpLocation, "_result");
+
+ HiveConf driverConf = setUpDriverSession(hiveConf);
+
+ String tmpPrefix = table.getDbName() + ".tmp_minor_compactor_" + table.getTableName() + "_";
+ String tmpTableBase = tmpPrefix + System.currentTimeMillis();
+
+ List<String> createTableQueries =
+ getCreateQueries(tmpTableBase, table, partition == null ? table.getSd() : partition.getSd(),
+ sourceTabLocation.toString(), resultTabLocation.toString(), dir, writeIds);
+ List<String> compactionQueries = getCompactionQueries(tmpTableBase, table.getSd());
+ List<String> dropQueries = getDropQueries(tmpTableBase);
+ runCompactionQueries(driverConf, tmpTableBase, storageDescriptor, writeIds, compactionInfo,
+ createTableQueries, compactionQueries, dropQueries);
+ }
+
+ /**
+ * Move files from "result table" directory to table/partition to compact's directory.
+ */
+ @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf,
+ ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
+ org.apache.hadoop.hive.ql.metadata.Table resultTable =
+ Hive.get().getTable(tmpTableName + "_result");
+ String from = resultTable.getSd().getLocation();
+ Path fromPath = new Path(from);
+ Path toPath = new Path(dest);
+ FileSystem fs = fromPath.getFileSystem(conf);
+ long maxTxn = actualWriteIds.getHighWatermark();
+ AcidOutputFormat.Options options =
+ new AcidOutputFormat.Options(conf).writingBase(false).isCompressed(false)
+ .minimumWriteId(1).maximumWriteId(maxTxn).bucket(0).statementId(-1)
+ .visibilityTxnId(compactorTxnId);
+ Path newDeltaDir = AcidUtils.createFilename(toPath, options).getParent();
+ if (!fs.exists(fromPath)) {
+ LOG.info(from + " not found. Assuming 0 splits. Creating " + newDeltaDir);
+ fs.mkdirs(newDeltaDir);
+ return;
+ }
+ LOG.info("Moving contents of " + from + " to " + dest);
+ fs.rename(fromPath, newDeltaDir);
+ fs.delete(fromPath, true);
+ }
+
+ /**
+ * Get a list of create/alter table queries. These tables serves as temporary data source for
+ * query based minor compaction. The following tables are created:
+ * <ol>
+ * <li>tmpTable - "source table": temporary, external, partitioned table. Each partition
+ * points to exactly one delta directory in the table/partition to compact</li>
+ * <li>tmpTable_result - "result table" : temporary table which stores the aggregated
+ * results of the minor compaction query until the compaction can be committed</li>
+ * </ol>
+ *
+ * @param tmpTableBase name of the first temp table (second will be $tmpTableBase_result)
+ * @param t Table to compact
+ * @param sd storage descriptor of table or partition to compact
+ * @param sourceTabLocation location the "source table" (temp table 1) should go
+ * @param resultTabLocation location the "result table (temp table 2) should go
+ * @param dir the parent directory of delta directories
+ * @param validWriteIdList valid write ids for the table/partition to compact
+ * @return List of 3 query strings: 2 create table, 1 alter table
+ */
+ private List<String> getCreateQueries(String tmpTableBase, Table t, StorageDescriptor sd,
+ String sourceTabLocation, String resultTabLocation, AcidUtils.Directory dir,
+ ValidWriteIdList validWriteIdList) {
+ List<String> queries = new ArrayList<>();
+ queries.add(
+ MmQueryCompactorUtils.getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true, true));
+ buildAlterTableQuery(tmpTableBase, dir, validWriteIdList).ifPresent(queries::add);
+ queries.add(MmQueryCompactorUtils
+ .getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false, false));
+ return queries;
+ }
+
+ /**
+ * Builds an alter table query, which adds partitions pointing to location of delta directories.
+ *
+ * @param tableName name of the temp table to be altered
+ * @param dir the parent directory of delta directories
+ * @param validWriteIdList valid write ids for the table/partition to compact
+ * @return alter table statement wrapped in {@link Optional}.
+ */
+ private Optional<String> buildAlterTableQuery(String tableName, AcidUtils.Directory dir,
+ ValidWriteIdList validWriteIdList) {
+ if (!dir.getCurrentDirectories().isEmpty()) {
+ long minWriteID =
+ validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId();
+ long highWatermark = validWriteIdList.getHighWatermark();
+ List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories().stream().filter(
+ delta -> delta.getMaxWriteId() <= highWatermark && delta.getMinWriteId() >= minWriteID)
+ .collect(Collectors.toList());
+ if (!deltas.isEmpty()) {
+ StringBuilder query = new StringBuilder().append("alter table ").append(tableName);
+ query.append(" add ");
+ deltas.forEach(
+ delta -> query.append("partition (file_name='").append(delta.getPath().getName())
+ .append("') location '").append(delta.getPath()).append("' "));
+ return Optional.of(query.toString());
+ }
+ }
+ return Optional.empty();
+ }
+
+ /**
+ * Get a list containing just the minor compaction query. The query selects the content of the
+ * source temporary table and inserts it into the resulttable. It will look like:
+ * <ol>
+ * <li>insert into table $tmpTableBase_result select `col_1`, .. from tmpTableBase</li>
+ * </ol>
+ *
+ * @param tmpTableBase an unique identifier, which helps to find all the temporary tables
+ * @return list of compaction queries, always non-null
+ */
+ private List<String> getCompactionQueries(String tmpTableBase, StorageDescriptor sd) {
+ String resultTableName = tmpTableBase + "_result";
+ StringBuilder query = new StringBuilder().append("insert into table ").append(resultTableName)
+ .append(" select ");
+ List<FieldSchema> cols = sd.getCols();
+ boolean isFirst = true;
+ for (FieldSchema col : cols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("`").append(col.getName()).append("`");
+ }
+ query.append(" from ").append(tmpTableBase);
+ return Lists.newArrayList(query.toString());
+ }
+
+ /**
+ * Get list of drop table statements.
+ * @param tmpTableBase an unique identifier, which helps to find all the temp tables
+ * @return list of drop table statements, always non-null
+ */
+ private List<String> getDropQueries(String tmpTableBase) {
+ return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableBase,
+ MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableBase + "_result");
+ }
+
+ private HiveConf setUpDriverSession(HiveConf hiveConf) {
+ HiveConf driverConf = new HiveConf(hiveConf);
+ driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+ driverConf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_FETCH_COLUMN_STATS, false);
+ driverConf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_ESTIMATE_STATS, false);
+ return driverConf;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java
new file mode 100644
index 0000000..891696d
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+final class MmQueryCompactorUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MmQueryCompactorUtils.class.getName());
+ static final String DROP_IF_EXISTS = "drop table if exists ";
+
+ private MmQueryCompactorUtils() {}
+
+ /**
+ * Creates a command to create a new table based on an example table (sourceTab).
+ *
+ * @param fullName of new table
+ * @param sourceTab the table we are modeling the new table on
+ * @param sd StorageDescriptor of the table or partition we are modeling the new table on
+ * @param location of the new table
+ * @param isPartitioned should the new table be partitioned
+ * @param isExternal should the new table be external
+ * @return query string creating the new table
+ */
+ static String getCreateQuery(String fullName, Table sourceTab, StorageDescriptor sd,
+ String location, boolean isPartitioned, boolean isExternal) {
+ StringBuilder query = new StringBuilder("create temporary ");
+ if (isExternal) {
+ query.append("external ");
+ }
+ query.append("table ").append(fullName).append("(");
+ List<FieldSchema> cols = sourceTab.getSd().getCols();
+ boolean isFirst = true;
+ for (FieldSchema col : cols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("`").append(col.getName()).append("` ").append(col.getType());
+ }
+ query.append(") ");
+
+ // Partitioning. Used for minor compaction.
+ if (isPartitioned) {
+ query.append(" PARTITIONED BY (`file_name` STRING) ");
+ }
+
+ // Bucketing.
+ List<String> buckCols = sourceTab.getSd().getBucketCols();
+ if (buckCols.size() > 0) {
+ query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
+ List<Order> sortCols = sourceTab.getSd().getSortCols();
+ if (sortCols.size() > 0) {
+ query.append("SORTED BY (");
+ isFirst = true;
+ for (Order sortCol : sortCols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder()));
+ }
+ query.append(") ");
+ }
+ query.append("INTO ").append(sourceTab.getSd().getNumBuckets()).append(" BUCKETS");
+ }
+
+ // Stored as directories. We don't care about the skew otherwise.
+ if (sourceTab.getSd().isStoredAsSubDirectories()) {
+ SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo();
+ if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+ query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
+ isFirst = true;
+ for (List<String> colValues : skewedInfo.getSkewedColValues()) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("('").append(StringUtils.join("','", colValues)).append("')");
+ }
+ query.append(") STORED AS DIRECTORIES");
+ }
+ }
+
+ SerDeInfo serdeInfo = sd.getSerdeInfo();
+ Map<String, String> serdeParams = serdeInfo.getParameters();
+ query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib()))
+ .append("'");
+ String sh = sourceTab.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+ assert sh == null; // Not supposed to be a compactable table.
+ if (!serdeParams.isEmpty()) {
+ ShowCreateTableOperation.appendSerdeParams(query, serdeParams);
+ }
+ query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat()))
+ .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat()))
+ .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
+ // Exclude all standard table properties.
+ Set<String> excludes = getHiveMetastoreConstants();
+ excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
+ isFirst = true;
+ for (Map.Entry<String, String> e : sourceTab.getParameters().entrySet()) {
+ if (e.getValue() == null) {
+ continue;
+ }
+ if (excludes.contains(e.getKey())) {
+ continue;
+ }
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue()))
+ .append("'");
+ }
+ if (!isFirst) {
+ query.append(", ");
+ }
+ query.append("'transactional'='false')");
+ return query.toString();
+
+ }
+
+ private static Set<String> getHiveMetastoreConstants() {
+ Set<String> result = new HashSet<>();
+ for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
+ if (!Modifier.isStatic(f.getModifiers())) {
+ continue;
+ }
+ if (!Modifier.isFinal(f.getModifiers())) {
+ continue;
+ }
+ if (!String.class.equals(f.getType())) {
+ continue;
+ }
+ f.setAccessible(true);
+ try {
+ result.add((String) f.get(null));
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Remove the delta directories of aborted transactions.
+ */
+ static void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException {
+ List<Path> filesToDelete = dir.getAbortedDirectories();
+ if (filesToDelete.size() < 1) {
+ return;
+ }
+ LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
+ FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
+ for (Path dead : filesToDelete) {
+ LOG.debug("Going to delete path " + dead.toString());
+ fs.delete(dead, true);
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
index 2f2bb21..6542eef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
@@ -62,7 +62,11 @@ final class QueryCompactorFactory {
if (AcidUtils.isInsertOnlyTable(table.getParameters()) && HiveConf
.getBoolVar(configuration, HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
- return new MmMajorQueryCompactor();
+ if (compactionInfo.isMajorCompaction()) {
+ return new MmMajorQueryCompactor();
+ } else {
+ return new MmMinorQueryCompactor();
+ }
}
return null;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 88ca683..e56d831 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -1978,43 +1978,6 @@ public class TestTxnCommands2 {
}
/**
- * Test compaction for Micro-managed table
- * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables
- * 2. Compactions will only remove subdirectories for aborted transactions of MM tables, if any
- * @throws Exception
- */
- @Test
- public void testMmTableCompaction() throws Exception {
- // 1. Insert some rows into MM table
- runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)");
- runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)");
- // There should be 2 delta directories
- verifyDirAndResult(2);
-
- // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs should stay.
- runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'");
- runWorker(hiveConf);
- verifyDirAndResult(2);
-
- // 3. Let a transaction be aborted
- hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
- runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(5,6)");
- hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
- // There should be 3 delta directories. The new one is the aborted one.
- verifyDirAndResult(3);
-
- // 4. Perform a MINOR compaction again. This time it will remove the subdir for aborted transaction.
- runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'");
- runWorker(hiveConf);
- // The worker should remove the subdir for aborted transaction
- verifyDirAndResult(2);
-
- // 5. Run Cleaner. Shouldn't impact anything.
- runCleaner(hiveConf);
- verifyDirAndResult(2);
- }
-
- /**
* Test cleaner for TXN_TO_WRITE_ID table.
* @throws Exception
*/
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
index aabf15c..1de25cf 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
@@ -105,42 +105,6 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
runStatementOnDriver("drop table if exists " + t);
}
}
- /**
- * Test compaction for Micro-managed table
- * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables
- * 2. Compactions will only remove subdirectories for aborted transactions of MM tables, if any
- * @throws Exception
- */
- @Test
- public void testMmTableCompaction() throws Exception {
- // 1. Insert some rows into MM table
- runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)");
- runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)");
- // There should be 2 delta directories
- verifyDirAndResult(2);
-
- // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs should stay.
- runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'");
- runWorker(hiveConf);
- verifyDirAndResult(2);
-
- // 3. Let a transaction be aborted
- hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
- runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(5,6)");
- hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
- // There should be 3 delta directories. The new one is the aborted one.
- verifyDirAndResult(3);
-
- // 4. Perform a MINOR compaction again. This time it will remove the subdir for aborted transaction.
- runStatementOnDriver("alter table "+ TableExtended.MMTBL + " compact 'MINOR'");
- runWorker(hiveConf);
- // The worker should remove the subdir for aborted transaction
- verifyDirAndResult(2);
-
- // 5. Run Cleaner. Shouldn't impact anything.
- runCleaner(hiveConf);
- verifyDirAndResult(2);
- }
/**
* Test a scenario, on a micro-managed table, where an IOW comes in
@@ -523,96 +487,6 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
verifyDirAndResult(0, true);
}
- @Test
- public void testSnapshotIsolationWithAbortedTxnOnMmTable() throws Exception {
-
- // Insert two rows into the table.
- runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(1,2)");
- runStatementOnDriver("insert into " + TableExtended.MMTBL + "(a,b) values(3,4)");
- // There should be 2 delta directories
- verifyDirAndResult(2);
-
- // Initiate a minor compaction request on the table.
- runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MINOR'");
-
- // Run Compaction Worker to do compaction.
- // But we do not compact a MM table but only transit the compaction request to
- // "ready for cleaning" state in this case.
- runWorker(hiveConf);
- verifyDirAndResult(2);
-
- // Start an INSERT statement transaction and roll back this transaction.
- hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
- runStatementOnDriver("insert into " + TableExtended.MMTBL + " values (5, 6)");
- hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
- /**
- * There should be 3 delta directories. The new one is the aborted one.
- *
- * target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1541637725613/warehouse/mmtbl/
- ├── delta_0000001_0000001_0000
- │ └── 000000_0
- ├── delta_0000002_0000002_0000
- │ └── 000000_0
- └── delta_0000003_0000003_0000
- └── 000000_0
- */
- verifyDirAndResult(3);
-
- // Execute SELECT statement and verify the result set (should be two rows).
- int[][] expected = new int[][] {{1, 2}, {3, 4}};
- List<String> rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
- Assert.assertEquals(stringifyValues(expected), rs);
-
- // Run Cleaner.
- // delta_0000003_0000003_0000 produced by the aborted txn is removed even though it is
- // above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID since all data in it is aborted
- // This run does transition the entry "successful".
- runCleaner(hiveConf);
- verifyDirAndResult(2);
-
- // Execute SELECT and verify that aborted operation is not counted for MM table.
- rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
- Assert.assertEquals(stringifyValues(expected), rs);
-
- // Run initiator to execute CompactionTxnHandler.cleanEmptyAbortedTxns()
- Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
- 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
- TestTxnCommands2.runInitiator(hiveConf);
- // This run of Initiator doesn't add any compaction_queue entry
- // since we only have one MM table with data - we don't compact MM tables.
- verifyDirAndResult(2);
- Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
- 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
-
- // Execute SELECT statement and verify that aborted INSERT statement is not counted.
- rs = runStatementOnDriver("select a,b from " + TableExtended.MMTBL + " order by a,b");
- Assert.assertEquals(stringifyValues(expected), rs);
-
- // Initiate a minor compaction request on the table.
- runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MINOR'");
-
- // Run worker to delete aborted transaction's delta directory.
- runWorker(hiveConf);
- Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
- 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
- Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_COMPONENTS"),
- 1,
- TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_COMPONENTS"));
- verifyDirAndResult(2);
-
- // Run Cleaner to delete rows for the aborted transaction
- // from TXN_COMPONENTS.
- runCleaner(hiveConf);
-
- // Run initiator to clean the row fro the aborted transaction from TXNS.
- TestTxnCommands2.runInitiator(hiveConf);
- Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
- 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
- Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_COMPONENTS"),
- 0,
- TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_COMPONENTS"));
- }
-
private void verifyDirAndResult(int expectedDeltas) throws Exception {
verifyDirAndResult(expectedDeltas, false);
}