You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/06/09 08:52:23 UTC
[hive] branch master updated: HIVE-26293: Migrate remaining exclusive DDL operations to EXCL_WRITE lock & bug fixes (Denys Kuzmenko, reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c55318eb586 HIVE-26293: Migrate remaining exclusive DDL operations to EXCL_WRITE lock & bug fixes (Denys Kuzmenko, reviewed by Peter Vary)
c55318eb586 is described below
commit c55318eb586e81d1589d56b2349333c4a1359459
Author: Denys Kuzmenko <dk...@cloudera.com>
AuthorDate: Thu Jun 9 10:52:13 2022 +0200
HIVE-26293: Migrate remaining exclusive DDL operations to EXCL_WRITE lock & bug fixes (Denys Kuzmenko, reviewed by Peter Vary)
Closes #3103
---
.../hadoop/hive/ql/ddl/misc/msck/MsckAnalyzer.java | 3 +-
.../hive/ql/ddl/table/drop/DropTableAnalyzer.java | 2 -
.../storage/skewed/AlterTableSkewedByAnalyzer.java | 4 +
.../drop/DropMaterializedViewAnalyzer.java | 2 -
.../apache/hadoop/hive/ql/hooks/WriteEntity.java | 118 +++++++----
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 39 ++--
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 9 +-
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 13 +-
.../org/apache/hadoop/hive/ql/TestTxnCommands.java | 53 +++--
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 22 ++
.../ql/lockmgr/DbTxnManagerEndToEndTestBase.java | 5 +-
.../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 227 ++++++++++++++++++++-
.../hadoop/hive/ql/parse/TestParseUtils.java | 32 ++-
.../hadoop/hive/metastore/HiveMetaStoreClient.java | 4 +-
.../hadoop/hive/metastore/AcidEventListener.java | 103 +++++-----
.../apache/hadoop/hive/metastore/HMSHandler.java | 31 ++-
.../hadoop/hive/metastore/txn/TxnHandler.java | 18 +-
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 4 +
.../hive/metastore/txn/ThrowingTxnHandler.java | 9 +
19 files changed, 531 insertions(+), 167 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckAnalyzer.java
index e070cec99bf..1ed631321cf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/misc/msck/MsckAnalyzer.java
@@ -87,7 +87,8 @@ public class MsckAnalyzer extends AbstractFunctionAnalyzer {
}
if (repair && AcidUtils.isTransactionalTable(table)) {
- outputs.add(new WriteEntity(table, WriteType.DDL_EXCLUSIVE));
+ outputs.add(new WriteEntity(table, AcidUtils.isLocklessReadsEnabled(table, conf) ?
+ WriteType.DDL_EXCL_WRITE : WriteType.DDL_EXCLUSIVE));
} else {
outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL_SHARED));
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableAnalyzer.java
index 1a3a77f436f..b36ad17234f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/drop/DropTableAnalyzer.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE;
-
/**
* Analyzer for table dropping commands.
*/
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/skewed/AlterTableSkewedByAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/skewed/AlterTableSkewedByAnalyzer.java
index e02d65d1e33..369e44117f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/skewed/AlterTableSkewedByAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/skewed/AlterTableSkewedByAnalyzer.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
@@ -57,6 +58,9 @@ public class AlterTableSkewedByAnalyzer extends AbstractAlterTableAnalyzer {
Table table = getTable(tableName);
validateAlterTableType(table, AlterTableType.SKEWED_BY, false);
+ if (AcidUtils.isLocklessReadsEnabled(table, conf)) {
+ throw new UnsupportedOperationException(command.getText());
+ }
inputs.add(new ReadEntity(table));
outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL_EXCLUSIVE));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/drop/DropMaterializedViewAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/drop/DropMaterializedViewAnalyzer.java
index 433fab5e91d..4f2af904cf6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/drop/DropMaterializedViewAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/materialized/drop/DropMaterializedViewAnalyzer.java
@@ -34,8 +34,6 @@ import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.SemanticException;
-import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE;
-
/**
* Analyzer for drop materialized view commands.
*/
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
index 567da91b48b..d3ab6d21e01 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
@@ -206,49 +206,85 @@ public class WriteEntity extends Entity implements Serializable {
*/
public static WriteType determineAlterTableWriteType(AlterTableType op, Table table, HiveConf conf) {
switch (op) {
- case CLUSTERED_BY:
- case NOT_SORTED:
- case NOT_CLUSTERED:
- case SET_FILE_FORMAT:
- case SET_SERDE:
- case DROPPROPS:
- case ARCHIVE:
- case UNARCHIVE:
- case ALTERLOCATION:
- case DROPPARTITION:
- case RENAMEPARTITION:
- case SKEWED_BY:
- case SET_SKEWED_LOCATION:
- case INTO_BUCKETS:
- case ALTERPARTITION:
- case TRUNCATE:
- case MERGEFILES:
- case OWNER:
- return WriteType.DDL_EXCLUSIVE;
-
- case ADDCOLS:
- case REPLACE_COLUMNS:
- case RENAME_COLUMN:
- case ADD_CONSTRAINT:
- case DROP_CONSTRAINT:
- case RENAME:
- return AcidUtils.isLocklessReadsEnabled(table, conf) ?
- WriteType.DDL_EXCL_WRITE : WriteType.DDL_EXCLUSIVE;
-
- case ADDPARTITION:
- case SET_SERDE_PROPS:
- case ADDPROPS:
- case UPDATESTATS:
- return WriteType.DDL_SHARED;
-
- case COMPACT:
- case TOUCH:
- return WriteType.DDL_NO_LOCK;
-
- default:
- throw new RuntimeException("Unknown operation " + op.toString());
+ case ARCHIVE:
+ case UNARCHIVE:
+ // Archiving methods are currently disabled
+ case ALTERLOCATION:
+ // alter table {table_name} [partition ({partition_spec})] set location "{new_location}"
+ case DROPPARTITION:
+ // Not used, @see org.apache.hadoop.hive.ql.ddl.table.partition.drop.AlterTableDropPartitionAnalyzer
+ // alter table {table_name} drop [if exists] partition ({partition_spec}) [, partition ({partition_spec}), ...] [purge]
+ case RENAMEPARTITION:
+ // Not used, @see org.apache.hadoop.hive.ql.ddl.table.partition.rename.AlterTableRenamePartitionAnalyzer
+ // alter table {table_name} partition {partition_spec} rename to partition {partition_spec}
+ case SKEWED_BY:
+ // Not used, @see org.apache.hadoop.hive.ql.ddl.table.storage.skewed.AlterTableSkewedByAnalyzer
+ // alter table {table_name} skewed by (col_name1, col_name2, ...)
+ // on ([(col_name1_value, col_name2_value, ...) [, (col_name1_value, col_name2_value), ...] [stored as directories]
+ case SET_SKEWED_LOCATION:
+ // alter table {table_name} set skewed location (col_name1="location1" [, col_name2="location2", ...] )
+ case INTO_BUCKETS:
+ // Not used, @see org.apache.hadoop.hive.ql.ddl.table.storage.cluster.AlterTableIntoBucketsAnalyzer
+ // alter table {table_name} [partition ({partition_spec})] into {bucket_number} buckets
+ case ALTERPARTITION:
+ // Not used: @see org.apache.hadoop.hive.ql.ddl.table.partition.alter.AlterTableAlterPartitionAnalyzer
+ // alter table {table_name} partition column ({column_name} {column_type})
+ case TRUNCATE:
+ // truncate table {table_name} [partition ({partition_spec})] columns ({column_spec})
+ // Also @see org.apache.hadoop.hive.ql.ddl.table.misc.truncate.TruncateTableAnalyzer
+ case MERGEFILES:
+ // alter table {table_name} [partition (partition_key = 'partition_value' [, ...])] concatenate
+ // Also @see org.apache.hadoop.hive.ql.ddl.table.storage.concatenate.AlterTableConcatenateAnalyzer
+ if (AcidUtils.isLocklessReadsEnabled(table, conf)) {
+ throw new UnsupportedOperationException(op.name());
+ } else {
+ return WriteType.DDL_EXCLUSIVE;
+ }
+
+ case CLUSTERED_BY:
+ // alter table {table_name} clustered by (col_name, col_name, ...) [sorted by (col_name, ...)]
+ // into {num_buckets} buckets;
+ case NOT_SORTED:
+ case NOT_CLUSTERED:
+ case SET_FILE_FORMAT:
+ // alter table {table_name} [partition ({partition_spec})] set fileformat {file_format}
+ case SET_SERDE:
+ // alter table {table_name} [PARTITION ({partition_spec})] set serde '{serde_class_name}'
+ case ADDCOLS:
+ case REPLACE_COLUMNS:
+ // alter table {table_name} [partition ({partition_spec})] add/replace columns ({col_name} {data_type})
+ case RENAME_COLUMN:
+ // alter table {table_name} [partition ({partition_spec})] change column {column_name} {column_name} {data_type}
+ case ADD_CONSTRAINT:
+ case DROP_CONSTRAINT:
+ case OWNER:
+ case RENAME:
+ // alter table {table_name} rename to {new_table_name}
+ case DROPPROPS:
+ return AcidUtils.isLocklessReadsEnabled(table, conf) ?
+ WriteType.DDL_EXCL_WRITE : WriteType.DDL_EXCLUSIVE;
+
+ case ADDPARTITION:
+ // Not used: @see org.apache.hadoop.hive.ql.ddl.table.partition.add.AbstractAddPartitionAnalyzer
+ // alter table {table_name} add [if not exists] partition ({partition_spec}) [location '{location}']
+ // [, partition ({partition_spec}) [location '{location}'], ...];
+ case SET_SERDE_PROPS:
+ case ADDPROPS:
+ case UPDATESTATS:
+ return WriteType.DDL_SHARED;
+
+ case COMPACT:
+ // alter table {table_name} [partition (partition_key = 'partition_value' [, ...])]
+ // compact 'compaction_type'[and wait] [with overwrite tblproperties ("property"="value" [, ...])];
+ case TOUCH:
+ // alter table {table_name} touch [partition ({partition_spec})]
+ return WriteType.DDL_NO_LOCK;
+
+ default:
+ throw new RuntimeException("Unknown operation " + op.toString());
}
}
+
public boolean isDynamicPartitionWrite() {
return isDynamicPartitionWrite;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index abdd009acde..45684df17e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -2157,7 +2157,7 @@ public class AcidUtils {
*/
public static long getLogicalLength(FileSystem fs, FileStatus file) throws IOException {
Path acidDir = file.getPath().getParent(); //should be base_x or delta_x_y_
- if(AcidUtils.isInsertDelta(acidDir)) {
+ if (AcidUtils.isInsertDelta(acidDir)) {
ParsedDeltaLight pd = ParsedDeltaLight.parse(acidDir);
if(!pd.mayContainSideFile()) {
return file.getLen();
@@ -3013,7 +3013,7 @@ public class AcidUtils {
compBuilder.setOperationType(DataOperationType.NO_TXN);
break;
case INSERT_OVERWRITE:
- t = AcidUtils.getTable(output);
+ assert t != null;
if (AcidUtils.isTransactionalTable(t)) {
if (conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) && !sharedWrite) {
compBuilder.setExclusive();
@@ -3037,7 +3037,11 @@ public class AcidUtils {
if (isExclMergeInsert) {
compBuilder.setExclWrite();
} else {
- compBuilder.setSharedRead();
+ if (AcidUtils.isLocklessReadsEnabled(t, conf)) {
+ compBuilder.setSharedWrite();
+ } else {
+ compBuilder.setSharedRead();
+ }
}
}
if (isExclMergeInsert) {
@@ -3197,23 +3201,28 @@ public class AcidUtils {
private static boolean isSoftDeleteTxn(Configuration conf, ASTNode tree) {
boolean locklessReadsEnabled = HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED);
-
+
switch (tree.getToken().getType()) {
+ case HiveParser.TOK_DROPDATABASE:
case HiveParser.TOK_DROPTABLE:
case HiveParser.TOK_DROP_MATERIALIZED_VIEW:
- return locklessReadsEnabled
+ return locklessReadsEnabled
|| HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX);
-
- case HiveParser.TOK_ALTERTABLE_DROPPARTS:
- return locklessReadsEnabled
- || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE);
-
- case HiveParser.TOK_ALTERTABLE_RENAMEPART:
- return locklessReadsEnabled
- || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_RENAME_PARTITION_MAKE_COPY);
- default:
- return false;
+
+ case HiveParser.TOK_ALTERTABLE: {
+ boolean isDropParts = tree.getFirstChildWithType(HiveParser.TOK_ALTERTABLE_DROPPARTS) != null;
+ if (isDropParts) {
+ return locklessReadsEnabled
+ || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE);
+ }
+ boolean isRenamePart = tree.getFirstChildWithType(HiveParser.TOK_ALTERTABLE_RENAMEPART) != null;
+ if (isRenamePart) {
+ return locklessReadsEnabled
+ || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_RENAME_PARTITION_MAKE_COPY);
+ }
+ }
}
+ return false;
}
public static String getPathSuffix(long txnId) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 719c3edf246..e30f5b09052 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1309,7 +1309,8 @@ public class Hive {
boolean createTableUseSuffix = HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX)
|| HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED);
- if (createTableUseSuffix) {
+ if (createTableUseSuffix
+ && (tbl.getSd().getLocation() == null || tbl.getSd().getLocation().isEmpty())) {
tbl.setProperty(SOFT_DELETE_TABLE, Boolean.TRUE.toString());
}
tTbl.setTxnId(ss.getTxnMgr().getCurrentTxnId());
@@ -1375,11 +1376,7 @@ public class Hive {
}
public void dropTable(Table table, boolean ifPurge) throws HiveException {
- boolean tableWithSuffix = (HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX)
- || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED))
- && AcidUtils.isTransactionalTable(table)
- && Boolean.parseBoolean(table.getProperty(SOFT_DELETE_TABLE));
-
+ boolean tableWithSuffix = AcidUtils.isTableSoftDeleteEnabled(table, conf);
long txnId = Optional.ofNullable(SessionState.get())
.map(ss -> ss.getTxnMgr().getCurrentTxnId()).orElse(0L);
table.getTTable().setTxnId(txnId);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 8ea1f778de4..196ee478370 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -482,16 +482,21 @@ public class Cleaner extends MetaStoreCompactorThread {
}
private List<Path> remove(String location, CompactionInfo ci, List<Path> paths, boolean ifPurge, FileSystem fs)
- throws NoSuchObjectException, MetaException, IOException {
+ throws MetaException, IOException {
List<Path> deleted = new ArrayList<>();
if (paths.size() < 1) {
return deleted;
}
LOG.info(idWatermark(ci) + " About to remove " + paths.size() +
" obsolete directories from " + location + ". " + getDebugInfo(paths));
- Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname);
- boolean needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
-
+ boolean needCmRecycle;
+ try {
+ Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname);
+ needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
+ } catch (NoSuchObjectException ex) {
+ // can not drop a database which is a source of replication
+ needCmRecycle = false;
+ }
for (Path dead : paths) {
LOG.debug("Going to delete path " + dead.toString());
if (needCmRecycle) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index be717e303ae..7eb148a204d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -82,6 +82,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
+import static java.util.Arrays.asList;
+import static org.apache.commons.collections.CollectionUtils.isEqualCollection;
import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE_PATTERN;
/**
@@ -1605,8 +1607,16 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
Assert.assertEquals(2, r.size());
}
+ @Test
+ public void testDropWithBaseAndRecreateOnePartition() throws Exception {
+ dropWithBaseOnePartition(true);
+ }
@Test
public void testDropWithBaseOnePartition() throws Exception {
+ dropWithBaseOnePartition(false);
+ }
+
+ private void dropWithBaseOnePartition(boolean reCreate) throws Exception {
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='a') values (1,2),(3,4)");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='b') values (5,5),(4,4)");
@@ -1617,18 +1627,14 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=b"),
AcidUtils.baseFileFilter);
if (1 != stat.length) {
- Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
+ Assert.fail("Expecting 1 base and found " + Arrays.toString(stat));
}
String name = stat[0].getPath().getName();
Assert.assertEquals("base_0000003", name);
- stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"),
- AcidUtils.baseFileFilter);
- if (0 != stat.length) {
- Assert.fail("Expecting no base and found " + stat.length + " files " + Arrays.toString(stat));
- }
List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLPART);
Assert.assertEquals(2, r.size());
+ Assert.assertTrue(isEqualCollection(r, asList("1\t2\ta", "3\t4\ta")));
TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
@@ -1636,13 +1642,20 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
Assert.assertTrue(resp.getCompacts().stream().anyMatch(
ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && "p=b".equals(ci.getPartitionname())));
-
+ if (reCreate) {
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='b') values (3,3)");
+ }
runCleaner(hiveConf);
-
+
stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase()),
path -> path.getName().equals("p=b"));
- if (0 != stat.length) {
- Assert.fail("Expecting partition data to be removed from FS");
+ if ((reCreate ? 1 : 0) != stat.length) {
+ Assert.fail("Partition data was " + (reCreate ? "" : "not") + " removed from FS");
+ }
+ if (reCreate) {
+ r = runStatementOnDriver("select * from " + Table.ACIDTBLPART);
+ Assert.assertEquals(3, r.size());
+ Assert.assertTrue(isEqualCollection(r, asList("1\t2\ta", "3\t4\ta", "3\t3\tb")));
}
}
@@ -1662,7 +1675,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
FileSystem fs = FileSystem.get(hiveConf);
FileStatus[] stat;
- for (char p : Arrays.asList('a', 'b')) {
+ for (char p : asList('a', 'b')) {
String partName = "p1=a/p2=a/p3=" + p;
Assert.assertTrue(resp.getCompacts().stream().anyMatch(
ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && partName.equals(ci.getPartitionname())));
@@ -1686,11 +1699,11 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
runCleaner(hiveConf);
- for (char p : Arrays.asList('a', 'b')) {
+ for (char p : asList('a', 'b')) {
stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/p1=a/p2=a"),
path -> path.getName().equals("p3=" + p));
if (0 != stat.length) {
- Assert.fail("Expecting partition data to be removed from FS");
+ Assert.fail("Partition data was not removed from FS");
}
}
}
@@ -1777,6 +1790,14 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
stat = fs.listStatus(new Path(getWarehouseDir(), "2"),
t -> t.getName().equals("data"));
Assert.assertEquals(0, stat.length);
+
+ runCleaner(hiveConf);
+
+ stat = fs.listStatus(new Path(getWarehouseDir(), database + ".db"),
+ t -> t.getName().matches("(mv_)?" + tableName + "2" + SOFT_DELETE_TABLE_PATTERN));
+ if (stat.length != 0) {
+ Assert.fail("Table data was not removed from FS");
+ }
}
@Test
@@ -1784,7 +1805,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
String tableName = "tab_acid";
runStatementOnDriver("drop table if exists " + tableName);
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, true);
-
+
runStatementOnDriver("create table " + tableName + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("insert into " + tableName + " values(1,2),(3,4)");
runStatementOnDriver("drop table " + tableName);
@@ -1833,7 +1854,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
String tableName = "tab_acid";
runStatementOnDriver("drop table if exists " + tableName);
- for (boolean enabled : Arrays.asList(false, true)) {
+ for (boolean enabled : asList(false, true)) {
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, enabled);
runStatementOnDriver("create table " + tableName + "(a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("insert into " + tableName + " values(1,2),(3,4)");
@@ -1922,7 +1943,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
String mviewName = "mv_" + tableName;
runStatementOnDriver("drop materialized view if exists " + mviewName);
- for (boolean enabled : Arrays.asList(false, true)) {
+ for (boolean enabled : asList(false, true)) {
runStatementOnDriver("drop table if exists " + tableName);
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_CREATE_TABLE_USE_SUFFIX, enabled);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index 9e6b669b991..b6357093f45 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.io;
+import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.AcidUtils.AcidOperationalProperties;
@@ -42,7 +44,9 @@ import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFile;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockFileSystem;
import org.apache.hadoop.hive.ql.io.orc.TestInputOutputFormat.MockPath;
import org.apache.hadoop.hive.ql.io.orc.TestOrcRawRecordMerger;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
+import org.junit.Assert;
import org.junit.Test;
public class TestAcidUtils {
@@ -688,4 +692,22 @@ public class TestAcidUtils {
@Test
public void testGetLogicalLength() throws Exception {
}
+
+ @Test
+ public void testTableIsSoftDeleteCompliant(){
+ Table table = new Table("dummy", "test_acid");
+ table.setTableType(TableType.MANAGED_TABLE);
+
+ HiveConf conf = new HiveConf();
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, true);
+
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, "true");
+ parameters.put(SOFT_DELETE_TABLE, "true");
+ table.setParameters(parameters);
+ Assert.assertTrue(AcidUtils.isTableSoftDeleteEnabled(table, conf));
+
+ parameters.remove(SOFT_DELETE_TABLE);
+ Assert.assertFalse(AcidUtils.isTableSoftDeleteEnabled(table, conf));
+ }
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
index ad47f91a6d7..f9fc37dbad8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
@@ -75,13 +75,14 @@ public abstract class DbTxnManagerEndToEndTestBase {
}
SessionState.start(conf);
ctx = new Context(conf);
- driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build());
- driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build());
HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_LOCKS_PARTITION_THRESHOLD, -1);
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, false);
HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);
+
+ driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build());
+ driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build());
TestTxnDbUtil.cleanDb(conf);
SessionState ss = SessionState.get();
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 82ffc315fbe..a076dec9b7d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -66,6 +67,9 @@ import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE_PATT
import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.runWorker;
import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.runCleaner;
+import static java.util.Arrays.asList;
+import static org.apache.commons.collections.CollectionUtils.isEqualCollection;
+
/**
* See additional tests in {@link org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager}
* Tests here are "end-to-end"ish and simulate concurrent queries.
@@ -2429,7 +2433,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
List<String> res = new ArrayList<>();
driver.getFetchTask().fetch(res);
Assert.assertEquals(2, res.size());
- Assert.assertEquals("Lost Update", "[earl\t10, amy\t10]", res.toString());
+ Assert.assertTrue("Lost Update", isEqualCollection(res, asList("earl\t10", "amy\t10")));
}
@Test
@@ -3792,7 +3796,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
}
@Test
- public void testDropViewNoLocks() throws Exception {
+ public void testDropAlterViewNoLocks() throws Exception {
driver.run("drop view if exists v_tab_acid");
dropTable(new String[] {"tab_acid"});
@@ -3803,10 +3807,22 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
driver.run("create view v_tab_acid partitioned on (p) " +
"as select a, p from tab_acid where b > 1");
- driver.compileAndRespond("drop view if exists v_tab_acid");
+ driver.compileAndRespond("alter view v_tab_acid as select a, p from tab_acid where b < 5");
driver.lockAndRespond();
List<ShowLocksResponseElement> locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.SHARED_READ,
+ LockState.ACQUIRED, "default", "tab_acid", null, locks);
+ // FIXME: redundant read-lock on db level
+ checkLock(LockType.SHARED_READ,
+ LockState.ACQUIRED, "default", null, null, locks);
+ driver.close();
+
+ driver.compileAndRespond("drop view if exists v_tab_acid");
+
+ driver.lockAndRespond();
+ locks = getLocks();
Assert.assertEquals("Unexpected lock count", 0, locks.size());
}
@@ -4117,7 +4133,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
LockState.ACQUIRED, "default", "tab_acid", null, locks);
driver.close();
- driver.compileAndRespond("alter table tab_acid DROP CONSTRAINT a_PK");
+ driver.compileAndRespond("alter table tab_acid DROP CONSTRAINT a_PK");
driver.lockAndRespond();
locks = getLocks();
@@ -4251,13 +4267,13 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
Mockito.reset(driver, driver2);
driver.getFetchTask().fetch(res);
- Assert.assertEquals("[1\t2, 3\t4]", res.toString());
+ Assert.assertTrue(isEqualCollection(res, asList("1\t2", "3\t4")));
driver.run("insert into tab_acid values(5,6,7)");
driver.run("select * from tab_acid");
res = new ArrayList<>();
driver.getFetchTask().fetch(res);
- Assert.assertEquals("[1\t2\tNULL, 3\t4\tNULL, 5\t6\t7]", res.toString());
+ Assert.assertTrue(isEqualCollection(res, asList("1\t2\tNULL", "3\t4\tNULL", "5\t6\t7")));
}
@Test
@@ -4346,6 +4362,201 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
Mockito.reset(driver, driver2);
driver.getFetchTask().fetch(res);
- Assert.assertEquals("[1\t2, 3\t4]", res.toString());
+ Assert.assertTrue(isEqualCollection(res, asList("1\t2", "3\t4")));
+ }
+
+ @Test
+ public void testAlterTableClusteredBy() throws Exception {
+ dropTable(new String[] {"tab_acid"});
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, true);
+
+ driver.run("create table if not exists tab_acid (a int, b int) " +
+ "stored as orc TBLPROPERTIES ('transactional'='true')");
+ driver.run("insert into tab_acid (a,b) values(1,2),(3,4)");
+
+ driver.compileAndRespond("alter table tab_acid CLUSTERED BY(a) SORTED BY(b) INTO 32 BUCKETS", true);
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Fiddler");
+ List<ShowLocksResponseElement> locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.EXCL_WRITE, LockState.ACQUIRED, "default", "tab_acid", null, locks);
+
+ //simulate concurrent session
+ HiveTxnManager txnMgr2 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
+ driver.compileAndRespond("insert into tab_acid (a,b) values(1,2),(3,4)", true);
+ ((DbTxnManager)txnMgr2).acquireLocks(driver.getPlan(), ctx, "Fiddler", false);
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.EXCL_WRITE, LockState.ACQUIRED, "default", "tab_acid", null, locks);
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "tab_acid", null, locks);
+
+ txnMgr2.rollbackTxn();
+ txnMgr.commitTxn();
+ }
+
+ @Test
+ public void testMsckRepair() throws Exception {
+ dropTable(new String[] { "tab_acid", "tab_acid_msck"});
+
+ driver.run("create table tab_acid (a int, b int) partitioned by (p string) " +
+ "stored as orc TBLPROPERTIES ('transactional'='true')");
+ driver.run("insert into tab_acid partition(p) values (1,1,'p1'),(2,2,'p1'),(3,3,'p1')");
+ driver.run("insert into tab_acid partition(p) values (1,2,'p1'),(2,3,'p1'),(3,4,'p1')");
+
+ // Create target table
+ driver.run("create table tab_acid_msck (a int, b int) partitioned by (p string) " +
+ " stored as orc TBLPROPERTIES ('transactional'='true')");
+
+ // copy files on fs
+ FileSystem fs = FileSystem.get(conf);
+ FileUtil.copy(fs, new Path(getWarehouseDir() + "/tab_acid/p=p1"), fs,
+ new Path(getWarehouseDir(), "tab_acid_msck"), false, conf);
+
+ FileStatus[] fileStatuses = fs.listStatus(new Path(getWarehouseDir(), "tab_acid_msck/p=p1"));
+ Assert.assertEquals(2, fileStatuses.length);
+
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, true);
+ // call msck repair
+ driver.compileAndRespond("msck repair table tab_acid_msck");
+ txnMgr.acquireLocks(driver.getPlan(), ctx, "Fiddler");
+ List<ShowLocksResponseElement> locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.EXCL_WRITE, LockState.ACQUIRED, "default", "tab_acid_msck", null, locks);
+
+ //simulate concurrent session
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
+ driver.compileAndRespond("insert into tab_acid_msck partition(p) values (1,3,'p1'),(2,4,'p1'),(3,5,'p1')", true);
+ txnMgr2.acquireLocks(driver.getPlan(), ctx, "Fiddler", false);
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 2, locks.size());
+ checkLock(LockType.EXCL_WRITE, LockState.ACQUIRED, "default", "tab_acid_msck", null, locks);
+ checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "tab_acid_msck", null, locks);
+
+ txnMgr2.rollbackTxn();
+ txnMgr.commitTxn();
+ }
+
+ @Test
+ public void testAlterTableSetPropertiesNonBlocking() throws Exception {
+ dropTable(new String[]{"tab_acid"});
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, true);
+
+ driver.run("create table tab_acid (a int, b int) partitioned by (p string) " +
+ "stored as orc TBLPROPERTIES ('transactional'='true')");
+
+ driver.compileAndRespond("alter table tab_acid set tblproperties ('DO_NOT_UPDATE_STATS'='true')");
+ driver.lockAndRespond();
+
+ List<ShowLocksResponseElement> locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.SHARED_READ,
+ LockState.ACQUIRED, "default", "tab_acid", null, locks);
+ driver.close();
+
+ driver.compileAndRespond("alter table tab_acid unset tblproperties ('transactional')");
+ driver.lockAndRespond();
+
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.EXCL_WRITE,
+ LockState.ACQUIRED, "default", "tab_acid", null, locks);
+ }
+
+ @Test
+ public void testSetSerdeAndFileFormatNonBlocking() throws Exception {
+ dropTable(new String[] {"tab_acid"});
+
+ driver.run("create table if not exists tab_acid (a int, b int) partitioned by (p string) " +
+ "TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
+ driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')");
+
+ driver.compileAndRespond("select * from tab_acid");
+
+ HiveConf.setBoolVar(driver2.getConf(), HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, true);
+ driver2 = Mockito.spy(driver2);
+
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
+
+ driver2.compileAndRespond("alter table tab_acid set serde 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'");
+ driver2.lockAndRespond();
+
+ List<ShowLocksResponseElement> locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.EXCL_WRITE,
+ LockState.ACQUIRED, "default", "tab_acid", null, locks);
+
+ Mockito.doNothing().when(driver2).lockAndRespond();
+ driver2.run();
+ Mockito.reset(driver2);
+
+ driver2.compileAndRespond("alter table tab_acid set fileformat rcfile");
+ driver2.lockAndRespond();
+
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.EXCL_WRITE,
+ LockState.ACQUIRED, "default", "tab_acid", null, locks);
+
+ Mockito.doNothing().when(driver2).lockAndRespond();
+ driver2.run();
+ Mockito.reset(driver2);
+
+ driver2.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo')");
+
+ swapTxnManager(txnMgr);
+ driver.run();
+
+ List<String> res = new ArrayList<>();
+ driver.getFetchTask().fetch(res);
+ Assert.assertEquals("No records found", 2, res.size());
+ }
+
+ @Test
+ public void testMaterializedViewRebuildNoLocks() throws Exception {
+ driver.run("drop materialized view if exists mv_tab_acid");
+ dropTable(new String[]{"tab_acid"});
+
+ driver.run("create table if not exists tab_acid (a int, b int) partitioned by (p string) " +
+ "stored as orc TBLPROPERTIES ('transactional'='true')");
+ driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')");
+
+ driver.run("create materialized view mv_tab_acid partitioned on (p) " +
+ "stored as orc TBLPROPERTIES ('transactional'='true') as select a, p from tab_acid where b > 1");
+
+ driver.compileAndRespond("alter materialized view mv_tab_acid rebuild");
+ driver.lockAndRespond();
+ List<ShowLocksResponseElement> locks = getLocks();
+ // FIXME: two rebuilds should not run in parallel
+ Assert.assertEquals("Unexpected lock count", 0, locks.size());
+ // cleanup
+ txnMgr.rollbackTxn();
+ driver.run("drop materialized view mv_tab_acid");
+ }
+
+ @Test
+ public void testMaterializedViewEnableRewriteNonBlocking() throws Exception {
+ driver.run("drop materialized view if exists mv_tab_acid");
+ dropTable(new String[]{"tab_acid"});
+
+ driver.run("create table if not exists tab_acid (a int, b int) partitioned by (p string) " +
+ "stored as orc TBLPROPERTIES ('transactional'='true')");
+ driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')");
+
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, true);
+ driver.run("create materialized view mv_tab_acid partitioned on (p) " +
+ "stored as orc TBLPROPERTIES ('transactional'='true') as select a, p from tab_acid where b > 1");
+
+ driver.compileAndRespond("alter materialized view mv_tab_acid enable rewrite");
+ driver.lockAndRespond();
+
+ List<ShowLocksResponseElement> locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+ checkLock(LockType.EXCL_WRITE,
+ LockState.ACQUIRED, "default", "mv_tab_acid", null, locks);
+ // cleanup
+ txnMgr.rollbackTxn();
+ driver.run("drop materialized view mv_tab_acid");
}
-}
\ No newline at end of file
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java
index 7827f0ff1f6..fb433c09045 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java
@@ -26,14 +26,13 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@@ -107,7 +106,14 @@ public class TestParseUtils {
{"CREATE MATERIALIZED VIEW matview AS SELECT * FROM b", TxnType.DEFAULT},
{"ALTER MATERIALIZED VIEW matview REBUILD", TxnType.MATER_VIEW_REBUILD},
- {"ALTER MATERIALIZED VIEW matview DISABLE REWRITE", TxnType.DEFAULT}
+ {"ALTER MATERIALIZED VIEW matview DISABLE REWRITE", TxnType.DEFAULT},
+
+ {"DROP DATABASE dummy CASCADE", TxnType.SOFT_DELETE},
+ {"DROP TABLE a", TxnType.SOFT_DELETE},
+ {"DROP MATERIALIZED VIEW matview", TxnType.SOFT_DELETE},
+ {"ALTER TABLE TAB_ACID DROP PARTITION (P='FOO')", TxnType.SOFT_DELETE},
+ {"ALTER TABLE a RENAME TO b", TxnType.DEFAULT},
+ {"ALTER TABLE a PARTITION (p='foo') RENAME TO PARTITION (p='baz')", TxnType.SOFT_DELETE}
});
}
@@ -124,7 +130,25 @@ public class TestParseUtils {
txnType == TxnType.READ_ONLY ? TxnType.DEFAULT : txnType);
}
+ @Test
+ public void testTxnTypeWithLocklessReadsEnabled() throws Exception {
+ enableLocklessReadsFeature(true);
+ Assert.assertEquals(AcidUtils.getTxnType(conf, ParseUtils.parse(query,new Context(conf))), txnType);
+ }
+
+ @Test
+ public void testTxnTypeWithLocklessReadsDisabled() throws Exception {
+ enableLocklessReadsFeature(false);
+ Assert.assertEquals(AcidUtils.getTxnType(conf, ParseUtils.parse(query,new Context(conf))), TxnType.DEFAULT);
+ }
+
private void enableReadOnlyTxnFeature(boolean featureFlag) {
- conf.setBoolean(HiveConf.ConfVars.HIVE_TXN_READONLY_ENABLED.varname, featureFlag);
+ Assume.assumeTrue(txnType == TxnType.READ_ONLY || txnType == TxnType.DEFAULT);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TXN_READONLY_ENABLED, featureFlag);
+ }
+
+ private void enableLocklessReadsFeature(boolean featureFlag) {
+ Assume.assumeTrue(txnType == TxnType.SOFT_DELETE);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, featureFlag);
}
}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index db8477d89bd..2d68d6acafc 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -1616,7 +1616,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
// First we delete the materialized views
Table mview = getTable(getDefaultCatalog(conf), req.getName(), table);
boolean isSoftDelete = req.isSoftDelete() && Boolean.parseBoolean(
- mview.getParameters().getOrDefault(SOFT_DELETE_TABLE, "false"));
+ mview.getParameters().get(SOFT_DELETE_TABLE));
mview.setTxnId(req.getTxnId());
dropTable(mview, req.isDeleteData() && !isSoftDelete, true, false);
}
@@ -1675,7 +1675,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
hook.preDropTable(table);
}
boolean isSoftDelete = req.isSoftDelete() && Boolean.parseBoolean(
- table.getParameters().getOrDefault(SOFT_DELETE_TABLE, "false"));
+ table.getParameters().get(SOFT_DELETE_TABLE));
EnvironmentContext context = null;
if (req.isSetTxnId()) {
context = new EnvironmentContext();
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
index 3577440c4e2..eba80212bd3 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
@@ -45,6 +45,7 @@ import java.util.List;
import java.util.Optional;
import static org.apache.hadoop.hive.metastore.HMSHandler.isMustPurge;
+import static org.apache.hadoop.hive.metastore.HMSHandler.getWriteId;
import static org.apache.hadoop.hive.metastore.HiveMetaStoreClient.RENAME_PARTITION_MAKE_COPY;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.throwMetaException;
@@ -68,7 +69,8 @@ public class AcidEventListener extends TransactionalMetaStoreEventListener {
// but it's more efficient to unconditionally perform cleanup for the database, especially
// when there are a lot of tables
txnHandler = getTxnHandler();
- txnHandler.cleanupRecords(HiveObjectType.DATABASE, dbEvent.getDatabase(), null, null);
+ long currentTxn = getTxnId(dbEvent.getEnvironmentContext());
+ txnHandler.cleanupRecords(HiveObjectType.DATABASE, dbEvent.getDatabase(), null, null, currentTxn);
}
@Override
@@ -80,22 +82,18 @@ public class AcidEventListener extends TransactionalMetaStoreEventListener {
txnHandler.cleanupRecords(HiveObjectType.TABLE, null, table, null, !tableEvent.getDeleteData());
if (!tableEvent.getDeleteData()) {
- long currentTxn = Optional.ofNullable(tableEvent.getEnvironmentContext())
- .map(EnvironmentContext::getProperties)
- .map(prop -> prop.get("txnId"))
- .map(Long::parseLong)
- .orElse(0L);
+ long currentTxn = getTxnId(tableEvent.getEnvironmentContext());
- try {
- if (currentTxn > 0) {
+ if (currentTxn > 0) {
+ try {
CompactionRequest rqst = new CompactionRequest(table.getDbName(), table.getTableName(), CompactionType.MAJOR);
rqst.setRunas(TxnUtils.findUserToRunAs(table.getSd().getLocation(), table, conf));
rqst.putToProperties("location", table.getSd().getLocation());
rqst.putToProperties("ifPurge", Boolean.toString(isMustPurge(tableEvent.getEnvironmentContext(), table)));
txnHandler.submitForCleanup(rqst, table.getWriteId(), currentTxn);
+ } catch (InterruptedException | IOException e) {
+ throwMetaException(e);
}
- } catch (InterruptedException | IOException e) {
- throwMetaException(e);
}
}
}
@@ -111,36 +109,31 @@ public class AcidEventListener extends TransactionalMetaStoreEventListener {
txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, table, partitionEvent.getPartitionIterator());
if (!partitionEvent.getDeleteData()) {
- long currentTxn = Optional.ofNullable(context).map(EnvironmentContext::getProperties)
- .map(prop -> prop.get("txnId")).map(Long::parseLong)
- .orElse(0L);
+ long currentTxn = getTxnId(context);
- if (currentTxn > 0) {
- long writeId = Optional.of(context).map(EnvironmentContext::getProperties)
- .map(prop -> prop.get("writeId")).map(Long::parseLong)
- .orElse(0L);
-
- try {
- CompactionRequest rqst = new CompactionRequest(
- table.getDbName(), table.getTableName(), CompactionType.MAJOR);
- rqst.setRunas(TxnUtils.findUserToRunAs(table.getSd().getLocation(), table, conf));
- rqst.putToProperties("ifPurge", Boolean.toString(isMustPurge(context, table)));
-
- Iterator<Partition> partitionIterator = partitionEvent.getPartitionIterator();
- while (partitionIterator.hasNext()) {
- Partition p = partitionIterator.next();
-
- List<FieldSchema> partCols = partitionEvent.getTable().getPartitionKeys(); // partition columns
- List<String> partVals = p.getValues();
- rqst.setPartitionname(Warehouse.makePartName(partCols, partVals));
- rqst.putToProperties("location", p.getSd().getLocation());
-
- txnHandler.submitForCleanup(rqst, writeId, currentTxn);
- }
- } catch (InterruptedException | IOException e) {
- throwMetaException(e);
+ if (currentTxn > 0) {
+ long writeId = getWriteId(context);
+ try {
+ CompactionRequest rqst = new CompactionRequest(
+ table.getDbName(), table.getTableName(), CompactionType.MAJOR);
+ rqst.setRunas(TxnUtils.findUserToRunAs(table.getSd().getLocation(), table, conf));
+ rqst.putToProperties("ifPurge", Boolean.toString(isMustPurge(context, table)));
+
+ Iterator<Partition> partitionIterator = partitionEvent.getPartitionIterator();
+ while (partitionIterator.hasNext()) {
+ Partition p = partitionIterator.next();
+
+ List<FieldSchema> partCols = partitionEvent.getTable().getPartitionKeys(); // partition columns
+ List<String> partVals = p.getValues();
+ rqst.setPartitionname(Warehouse.makePartName(partCols, partVals));
+ rqst.putToProperties("location", p.getSd().getLocation());
+
+ txnHandler.submitForCleanup(rqst, writeId, currentTxn);
}
+ } catch (InterruptedException | IOException e) {
+ throwMetaException(e);
}
+ }
}
}
}
@@ -152,7 +145,7 @@ public class AcidEventListener extends TransactionalMetaStoreEventListener {
}
Table oldTable = tableEvent.getOldTable();
Table newTable = tableEvent.getNewTable();
- if(!oldTable.getCatName().equalsIgnoreCase(newTable.getCatName()) ||
+ if (!oldTable.getCatName().equalsIgnoreCase(newTable.getCatName()) ||
!oldTable.getDbName().equalsIgnoreCase(newTable.getDbName()) ||
!oldTable.getTableName().equalsIgnoreCase(newTable.getTableName())) {
txnHandler = getTxnHandler();
@@ -171,7 +164,7 @@ public class AcidEventListener extends TransactionalMetaStoreEventListener {
Table t = partitionEvent.getTable();
String oldPartName = Warehouse.makePartName(t.getPartitionKeys(), oldPart.getValues());
String newPartName = Warehouse.makePartName(t.getPartitionKeys(), newPart.getValues());
- if(!oldPartName.equals(newPartName)) {
+ if (!oldPartName.equals(newPartName)) {
txnHandler = getTxnHandler();
txnHandler.onRename(t.getCatName(), t.getDbName(), t.getTableName(), oldPartName,
t.getCatName(), t.getDbName(), t.getTableName(), newPartName);
@@ -179,28 +172,28 @@ public class AcidEventListener extends TransactionalMetaStoreEventListener {
EnvironmentContext context = partitionEvent.getEnvironmentContext();
Table table = partitionEvent.getTable();
- boolean clonePart = Optional.ofNullable(context).map(EnvironmentContext::getProperties)
- .map(prop -> prop.get(RENAME_PARTITION_MAKE_COPY)).map(Boolean::parseBoolean)
+ boolean clonePart = Optional.ofNullable(context)
+ .map(EnvironmentContext::getProperties)
+ .map(prop -> prop.get(RENAME_PARTITION_MAKE_COPY))
+ .map(Boolean::parseBoolean)
.orElse(false);
if (clonePart) {
- long currentTxn = Optional.of(context).map(EnvironmentContext::getProperties)
- .map(prop -> prop.get("txnId")).map(Long::parseLong)
- .orElse(0L);
-
- try {
- if (currentTxn > 0) {
+ long currentTxn = getTxnId(context);
+
+ if (currentTxn > 0) {
+ try {
CompactionRequest rqst = new CompactionRequest(
table.getDbName(), table.getTableName(), CompactionType.MAJOR);
rqst.setRunas(TxnUtils.findUserToRunAs(table.getSd().getLocation(), table, conf));
rqst.setPartitionname(oldPartName);
-
+
rqst.putToProperties("location", oldPart.getSd().getLocation());
rqst.putToProperties("ifPurge", Boolean.toString(isMustPurge(context, table)));
txnHandler.submitForCleanup(rqst, partitionEvent.getWriteId(), currentTxn);
+ } catch (InterruptedException | IOException e) {
+ throwMetaException(e);
}
- } catch ( InterruptedException | IOException e) {
- throwMetaException(e);
}
}
}
@@ -210,7 +203,7 @@ public class AcidEventListener extends TransactionalMetaStoreEventListener {
public void onAlterDatabase(AlterDatabaseEvent dbEvent) throws MetaException {
Database oldDb = dbEvent.getOldDatabase();
Database newDb = dbEvent.getNewDatabase();
- if(!oldDb.getCatalogName().equalsIgnoreCase(newDb.getCatalogName()) ||
+ if (!oldDb.getCatalogName().equalsIgnoreCase(newDb.getCatalogName()) ||
!oldDb.getName().equalsIgnoreCase(newDb.getName())) {
txnHandler = getTxnHandler();
txnHandler.onRename(
@@ -242,4 +235,12 @@ public class AcidEventListener extends TransactionalMetaStoreEventListener {
return txnHandler;
}
+
+ private long getTxnId(EnvironmentContext context) {
+ return Optional.ofNullable(context)
+ .map(EnvironmentContext::getProperties)
+ .map(prop -> prop.get("txnId"))
+ .map(Long::parseLong)
+ .orElse(0L);
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index 5b072093082..5292d0f3361 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -107,6 +107,7 @@ import java.util.regex.Pattern;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.join;
import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_PATH_SUFFIX;
+import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE_PATTERN;
import static org.apache.hadoop.hive.common.AcidConstants.SOFT_DELETE_TABLE;
import static org.apache.hadoop.hive.common.AcidConstants.DELTA_DIGITS;
@@ -1681,7 +1682,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
// add it's locations to the list of paths to delete
Path tablePath = null;
boolean isSoftDelete = req.isSoftDelete() && TxnUtils.isTransactionalTable(table)
- && Boolean.parseBoolean(table.getParameters().getOrDefault(SOFT_DELETE_TABLE, "false"));
+ && Boolean.parseBoolean(table.getParameters().get(SOFT_DELETE_TABLE));
boolean tableDataShouldBeDeleted = checkTableDataShouldBeDeleted(table, req.isDeleteData())
&& !isSoftDelete;
@@ -1720,10 +1721,16 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
if (ms.dropDatabase(req.getCatalogName(), req.getName())) {
if (!transactionalListeners.isEmpty()) {
+ DropDatabaseEvent dropEvent = new DropDatabaseEvent(db, true, this, isReplicated);
+ EnvironmentContext context = null;
+ if (!req.isDeleteManagedDir()) {
+ context = new EnvironmentContext();
+ context.putToProperties("txnId", String.valueOf(req.getTxnId()));
+ }
+ dropEvent.setEnvironmentContext(context);
transactionalListenerResponses =
MetaStoreListenerNotifier.notifyEvent(transactionalListeners,
- EventType.DROP_DATABASE,
- new DropDatabaseEvent(db, true, this, isReplicated));
+ EventType.DROP_DATABASE, dropEvent);
}
success = ms.commitTransaction();
}
@@ -1733,7 +1740,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
} else if (req.isDeleteData()) {
// Delete the data in the partitions which have other locations
deletePartitionData(partitionPaths, false, db);
- // Delete the data in the tables which have other locations
+ // Delete the data in the tables which have other locations or soft-delete is enabled
for (Path tablePath : tablePaths) {
deleteTableData(tablePath, false, db);
}
@@ -2351,13 +2358,17 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
if (!TableType.VIRTUAL_VIEW.toString().equals(tbl.getTableType())) {
if (tbl.getSd().getLocation() == null
|| tbl.getSd().getLocation().isEmpty()) {
- tblPath = wh.getDefaultTablePath(db, getTableName(tbl), isExternal(tbl));
+ tblPath = wh.getDefaultTablePath(db, tbl.getTableName() + getTableSuffix(tbl), isExternal(tbl));
} else {
if (!isExternal(tbl) && !MetaStoreUtils.isNonNativeTable(tbl)) {
LOG.warn("Location: " + tbl.getSd().getLocation()
+ " specified for non-external table:" + tbl.getTableName());
}
tblPath = wh.getDnsPath(new Path(tbl.getSd().getLocation()));
+ // ignore suffix if it's already there (direct-write CTAS)
+ if (!tblPath.getName().matches("(.*)" + SOFT_DELETE_TABLE_PATTERN)) {
+ tblPath = new Path(tblPath + getTableSuffix(tbl));
+ }
}
tbl.getSd().setLocation(tblPath.toString());
}
@@ -2503,10 +2514,10 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
}
}
- private String getTableName(Table tbl) {
- return tbl.getTableName() + (tbl.isSetTxnId() &&
- tbl.getParameters() != null && Boolean.parseBoolean(tbl.getParameters().get(SOFT_DELETE_TABLE)) ?
- SOFT_DELETE_PATH_SUFFIX + String.format(DELTA_DIGITS, tbl.getTxnId()) : "");
+ private String getTableSuffix(Table tbl) {
+ return tbl.isSetTxnId() && tbl.getParameters() != null
+ && Boolean.parseBoolean(tbl.getParameters().get(SOFT_DELETE_TABLE)) ?
+ SOFT_DELETE_PATH_SUFFIX + String.format(DELTA_DIGITS, tbl.getTxnId()) : "";
}
@Override
@@ -5099,7 +5110,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
|| MetaStoreUtils.isSkipTrash(tbl.getParameters());
}
- private long getWriteId(EnvironmentContext context){
+ static long getWriteId(EnvironmentContext context){
return Optional.ofNullable(context)
.map(EnvironmentContext::getProperties)
.map(prop -> prop.get("writeId"))
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index cb9ced7f78a..74e854944b8 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -4223,7 +4223,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
@Override
@RetrySemantics.Idempotent
public void cleanupRecords(HiveObjectType type, Database db, Table table,
- Iterator<Partition> partitionIterator, boolean keepTxnToWriteIdMetaData) throws MetaException {
+ Iterator<Partition> partitionIterator, boolean keepTxnToWriteIdMetaData) throws MetaException {
+ cleanupRecords(type, db, table, partitionIterator, keepTxnToWriteIdMetaData, 0);
+ }
+
+ @Override
+ @RetrySemantics.Idempotent
+ public void cleanupRecords(HiveObjectType type, Database db, Table table,
+ Iterator<Partition> partitionIterator, long txnId) throws MetaException {
+ cleanupRecords(type, db , table, partitionIterator, false, txnId);
+ }
+
+ private void cleanupRecords(HiveObjectType type, Database db, Table table,
+ Iterator<Partition> partitionIterator, boolean keepTxnToWriteIdMetaData, long txnId) throws MetaException {
// cleanup should be done only for objects belonging to default catalog
final String defaultCatalog = getDefaultCatalog(conf);
@@ -4259,11 +4271,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
buff.append(dbName);
buff.append("'");
queries.add(buff.toString());
-
+
buff.setLength(0);
buff.append("DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_DATABASE\"='");
buff.append(dbName);
- buff.append("'");
+ buff.append("' AND \"CQ_TXN_ID\"!=").append(txnId);
queries.add(buff.toString());
buff.setLength(0);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 9d2ebc7b099..cdfd95263da 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -403,6 +403,10 @@ public interface TxnStore extends Configurable {
void cleanupRecords(HiveObjectType type, Database db, Table table,
Iterator<Partition> partitionIterator, boolean keepTxnToWriteIdMetaData) throws MetaException;
+ @RetrySemantics.Idempotent
+ void cleanupRecords(HiveObjectType type, Database db, Table table,
+ Iterator<Partition> partitionIterator, long txnId) throws MetaException;
+
@RetrySemantics.Idempotent
void onRename(String oldCatName, String oldDbName, String oldTabName, String oldPartName,
String newCatName, String newDbName, String newTabName, String newPartName)
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/ThrowingTxnHandler.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/ThrowingTxnHandler.java
index 460727d7e90..72fa4e171ce 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/ThrowingTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/ThrowingTxnHandler.java
@@ -41,6 +41,15 @@ public class ThrowingTxnHandler extends CompactionTxnHandler {
super.cleanupRecords(type, db, table, partitionIterator, keepTxnToWriteIdMetaData);
}
+ @Override
+ public void cleanupRecords(HiveObjectType type, Database db, Table table,
+ Iterator<Partition> partitionIterator, long txnId) throws MetaException {
+ if (doThrow) {
+ throw new RuntimeException("during transactional cleanup");
+ }
+ super.cleanupRecords(type, db, table, partitionIterator, txnId);
+ }
+
@Override
public ShowCompactResponse showCompact(ShowCompactRequest rqst) throws MetaException {
if (doThrow) {