You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/01/05 08:54:32 UTC
[hive] branch master updated: HIVE-25688: Non blocking DROP PARTITION implementation (Denys Kuzmenko, reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 66f7540 HIVE-25688: Non blocking DROP PARTITION implementation (Denys Kuzmenko, reviewed by Peter Vary)
66f7540 is described below
commit 66f75403c27898e296cd085fe14a1d4b75c636bd
Author: Denys Kuzmenko <dk...@cloudera.com>
AuthorDate: Wed Jan 5 10:54:13 2022 +0200
HIVE-25688: Non blocking DROP PARTITION implementation (Denys Kuzmenko, reviewed by Peter Vary)
Closes #2780
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +
.../drop/AbstractDropPartitionAnalyzer.java | 34 +--
.../drop/AlterTableDropPartitionAnalyzer.java | 9 -
.../drop/AlterTableDropPartitionDesc.java | 30 ++-
.../drop/AlterTableDropPartitionOperation.java | 16 +-
.../drop/AlterViewDropPartitionAnalyzer.java | 9 -
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 6 +
.../org/apache/hadoop/hive/ql/metadata/Hive.java | 12 +-
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 236 ++++++++++++---------
.../hive/ql/txn/compactor/CompactorThread.java | 105 +++------
.../hadoop/hive/ql/txn/compactor/Initiator.java | 2 +-
.../hadoop/hive/ql/txn/compactor/Worker.java | 30 +--
.../org/apache/hadoop/hive/ql/TestTxnCommands.java | 117 ++++++++--
.../apache/hadoop/hive/ql/TestTxnCommands3.java | 8 +-
.../apache/hadoop/hive/ql/TestTxnConcatenate.java | 20 +-
.../org/apache/hadoop/hive/ql/TestTxnExIm.java | 6 +-
.../org/apache/hadoop/hive/ql/TestTxnLoadData.java | 50 ++---
.../apache/hadoop/hive/ql/TestTxnNoBuckets.java | 52 ++---
.../hadoop/hive/ql/TxnCommandsBaseForTests.java | 2 +
.../ql/lockmgr/DbTxnManagerEndToEndTestBase.java | 38 +++-
.../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 100 +++++++++
.../hive/ql/txn/compactor/TestInitiator.java | 7 +-
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 8 +-
.../src/gen/thrift/gen-cpp/hive_metastore_types.h | 3 +-
.../apache/hadoop/hive/metastore/api/TxnType.java | 5 +-
.../src/gen/thrift/gen-php/metastore/TxnType.php | 3 +
.../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 3 +
.../src/gen/thrift/gen-rb/hive_metastore_types.rb | 5 +-
.../hadoop/hive/metastore/HiveMetaStoreClient.java | 15 +-
.../hive/metastore/LockComponentBuilder.java | 4 +-
.../hive/metastore/PartitionDropOptions.java | 12 ++
.../src/main/thrift/hive_metastore.thrift | 3 +-
.../hadoop/hive/metastore/AcidEventListener.java | 53 ++++-
.../apache/hadoop/hive/metastore/HMSHandler.java | 152 +++++++------
.../hive/metastore/txn/CompactionTxnHandler.java | 3 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 71 +++++++
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 3 +
.../apache/hadoop/hive/metastore/txn/TxnUtils.java | 67 ++++++
.../hadoop/hive/common/AcidMetaDataFile.java | 4 +-
39 files changed, 898 insertions(+), 410 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b0b9b4f..fe88e04 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3071,6 +3071,11 @@ public class HiveConf extends Configuration {
"If enabled, truncate for transactional tables will not delete the data directories,\n" +
"rather create a new base directory with no datafiles."),
+ HIVE_ACID_DROP_PARTITION_USE_BASE("hive.acid.droppartition.usebase", false,
+ "Enables non-blocking DROP PARTITION operation.\n" +
+ "If enabled, drop for transactional tables will not delete the data directories,\n" +
+ "rather create a new base directory with no datafiles.\")"),
+
// Configs having to do with DeltaFilesMetricReporter, which collects lists of most recently active tables
// with the most number of active/obsolete deltas.
HIVE_TXN_ACID_METRICS_MAX_CACHE_SIZE("hive.txn.acid.metrics.max.cache.size", 100,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AbstractDropPartitionAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AbstractDropPartitionAnalyzer.java
index b727492..692732f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AbstractDropPartitionAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AbstractDropPartitionAnalyzer.java
@@ -31,10 +31,11 @@ import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.ddl.DDLWork;
import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableAnalyzer;
import org.apache.hadoop.hive.ql.ddl.table.AlterTableType;
-import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity.WriteType;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
@@ -49,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
* Analyzer for drop partition commands.
*/
abstract class AbstractDropPartitionAnalyzer extends AbstractAlterTableAnalyzer {
+
AbstractDropPartitionAnalyzer(QueryState queryState) throws SemanticException {
super(queryState);
}
@@ -103,34 +105,40 @@ abstract class AbstractDropPartitionAnalyzer extends AbstractAlterTableAnalyzer
re.noLockNeeded();
inputs.add(re);
- addTableDropPartsOutputs(table, partitionSpecs.values(), !ifExists);
-
+ boolean dropPartUseBase = HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE)
+ || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED)
+ && AcidUtils.isTransactionalTable(table);
+
+ addTableDropPartsOutputs(table, partitionSpecs.values(), !ifExists, dropPartUseBase);
+
AlterTableDropPartitionDesc desc =
- new AlterTableDropPartitionDesc(tableName, partitionSpecs, mustPurge, replicationSpec);
- rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc)));
-
- postProcess(tableName, table, desc);
+ new AlterTableDropPartitionDesc(tableName, partitionSpecs, mustPurge, replicationSpec, !dropPartUseBase, table);
+ if (desc.mayNeedWriteId()) {
+ setAcidDdlDesc(desc);
+ }
+ rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc)));
}
protected abstract boolean expectView();
- protected abstract void postProcess(TableName tableName, Table table, AlterTableDropPartitionDesc desc);
-
/**
* Add the table partitions to be modified in the output, so that it is available for the
* pre-execution hook. If the partition does not exist, throw an error if
* throwIfNonExistent is true, otherwise ignore it.
*/
- private void addTableDropPartsOutputs(Table tab, Collection<List<ExprNodeGenericFuncDesc>> partitionSpecs,
- boolean throwIfNonExistent) throws SemanticException {
+ private void addTableDropPartsOutputs(Table table, Collection<List<ExprNodeGenericFuncDesc>> partitionSpecs,
+ boolean throwIfNonExistent, boolean dropPartUseBase) throws SemanticException {
+ WriteType writeType =
+ dropPartUseBase ? WriteType.DDL_EXCL_WRITE : WriteType.DDL_EXCLUSIVE;
+
for (List<ExprNodeGenericFuncDesc> specs : partitionSpecs) {
for (ExprNodeGenericFuncDesc partitionSpec : specs) {
List<Partition> parts = new ArrayList<>();
boolean hasUnknown = false;
try {
- hasUnknown = db.getPartitionsByExpr(tab, partitionSpec, conf, parts);
+ hasUnknown = db.getPartitionsByExpr(table, partitionSpec, conf, parts);
} catch (Exception e) {
throw new SemanticException(ErrorMsg.INVALID_PARTITION.getMsg(partitionSpec.getExprString()), e);
}
@@ -147,7 +155,7 @@ abstract class AbstractDropPartitionAnalyzer extends AbstractAlterTableAnalyzer
}
}
for (Partition p : parts) {
- outputs.add(new WriteEntity(p, WriteEntity.WriteType.DDL_EXCLUSIVE));
+ outputs.add(new WriteEntity(p, writeType));
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionAnalyzer.java
index f8632c7..a1c038d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionAnalyzer.java
@@ -41,13 +41,4 @@ public class AlterTableDropPartitionAnalyzer extends AbstractDropPartitionAnalyz
protected boolean expectView() {
return false;
}
-
- @Override
- protected void postProcess(TableName tableName, Table table, AlterTableDropPartitionDesc desc) {
- if (!AcidUtils.isTransactionalTable(table)) {
- return;
- }
-
- setAcidDdlDesc(desc);
- }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionDesc.java
index bb64bd6..1b3df43 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionDesc.java
@@ -25,6 +25,8 @@ import java.util.Map;
import org.apache.hadoop.hive.ql.ddl.DDLDesc.DDLDescWithWriteId;
import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
@@ -65,10 +67,18 @@ public class AlterTableDropPartitionDesc implements DDLDescWithWriteId, Serializ
private final ArrayList<PartitionDesc> partSpecs;
private final boolean ifPurge;
private final ReplicationSpec replicationSpec;
- private Long writeId;
+ private final boolean deleteData;
+ private final boolean isTransactional;
+
+ private long writeId = 0;
public AlterTableDropPartitionDesc(TableName tableName, Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs,
boolean ifPurge, ReplicationSpec replicationSpec) {
+ this(tableName, partSpecs, ifPurge, replicationSpec, true, null);
+ }
+
+ public AlterTableDropPartitionDesc(TableName tableName, Map<Integer, List<ExprNodeGenericFuncDesc>> partSpecs,
+ boolean ifPurge, ReplicationSpec replicationSpec, boolean deleteData, Table table) {
this.tableName = tableName;
this.partSpecs = new ArrayList<PartitionDesc>(partSpecs.size());
for (Map.Entry<Integer, List<ExprNodeGenericFuncDesc>> partSpec : partSpecs.entrySet()) {
@@ -79,6 +89,8 @@ public class AlterTableDropPartitionDesc implements DDLDescWithWriteId, Serializ
}
this.ifPurge = ifPurge;
this.replicationSpec = replicationSpec == null ? new ReplicationSpec() : replicationSpec;
+ this.isTransactional = AcidUtils.isTransactionalTable(table);
+ this.deleteData = deleteData;
}
@Explain(displayName = "table", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
@@ -108,13 +120,21 @@ public class AlterTableDropPartitionDesc implements DDLDescWithWriteId, Serializ
}
@Override
- public String getFullTableName() {
- return getTableName();
+ public boolean mayNeedWriteId() {
+ return isTransactional;
+ }
+
+ public long getWriteId() {
+ return writeId;
+ }
+
+ public boolean getDeleteData() {
+ return deleteData;
}
@Override
- public boolean mayNeedWriteId() {
- return true;
+ public String getFullTableName() {
+ return getTableName();
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
index e579b0e..6e702b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterTableDropPartitionOperation.java
@@ -40,8 +40,6 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.HiveTableName;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import com.google.common.collect.Iterables;
-
/**
* Operation process of dropping some partitions of a table.
*/
@@ -103,7 +101,10 @@ public class AlterTableDropPartitionOperation extends DDLOperation<AlterTableDro
} else {
for (Partition p : partitions) {
if (replicationSpec.allowEventReplacementInto(dbParams)) {
- context.getDb().dropPartition(table.getDbName(), table.getTableName(), p.getValues(), true);
+ PartitionDropOptions options =
+ PartitionDropOptions.instance().deleteData(desc.getDeleteData())
+ .setWriteId(desc.getWriteId());
+ context.getDb().dropPartition(table.getDbName(), table.getTableName(), p.getValues(), options);
}
}
}
@@ -117,7 +118,7 @@ public class AlterTableDropPartitionOperation extends DDLOperation<AlterTableDro
private void dropPartitions(boolean isRepl) throws HiveException {
// ifExists is currently verified in AlterTableDropPartitionAnalyzer
- TableName tablenName = HiveTableName.of(desc.getTableName());
+ TableName tableName = HiveTableName.of(desc.getTableName());
List<Pair<Integer, byte[]>> partitionExpressions = new ArrayList<>(desc.getPartSpecs().size());
for (AlterTableDropPartitionDesc.PartitionDesc partSpec : desc.getPartSpecs()) {
@@ -126,8 +127,9 @@ public class AlterTableDropPartitionOperation extends DDLOperation<AlterTableDro
}
PartitionDropOptions options =
- PartitionDropOptions.instance().deleteData(true).ifExists(true).purgeData(desc.getIfPurge());
- List<Partition> droppedPartitions = context.getDb().dropPartitions(tablenName.getDb(), tablenName.getTable(),
+ PartitionDropOptions.instance().deleteData(desc.getDeleteData())
+ .ifExists(true).purgeData(desc.getIfPurge());
+ List<Partition> droppedPartitions = context.getDb().dropPartitions(tableName.getDb(), tableName.getTable(),
partitionExpressions, options);
if (isRepl) {
@@ -145,7 +147,7 @@ public class AlterTableDropPartitionOperation extends DDLOperation<AlterTableDro
DDLUtils.addIfAbsentByName(new WriteEntity(partition, WriteEntity.WriteType.DDL_NO_LOCK), context);
if (llapEvictRequestBuilder != null) {
- llapEvictRequestBuilder.addPartitionOfATable(tablenName.getDb(), tablenName.getTable(), partition.getSpec());
+ llapEvictRequestBuilder.addPartitionOfATable(tableName.getDb(), tableName.getTable(), partition.getSpec());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterViewDropPartitionAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterViewDropPartitionAnalyzer.java
index 9f8961e..9fdeb56 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterViewDropPartitionAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/drop/AlterViewDropPartitionAnalyzer.java
@@ -36,15 +36,6 @@ public class AlterViewDropPartitionAnalyzer extends AbstractDropPartitionAnalyze
}
@Override
- protected void postProcess(TableName tableName, Table table, AlterTableDropPartitionDesc desc) {
- if (!AcidUtils.isTransactionalTable(table)) {
- return;
- }
-
- setAcidDdlDesc(desc);
- }
-
- @Override
protected boolean expectView() {
return true;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 222e984..519c8f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -3109,6 +3109,12 @@ public class AcidUtils {
if (tree.getFirstChildWithType(HiveParser.TOK_ALTERTABLE_COMPACT) != null){
return TxnType.COMPACTION;
}
+ // check if soft delete
+ if (tree.getToken().getType() == HiveParser.TOK_ALTERTABLE_DROPPARTS
+ && (HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE)
+ || HiveConf.getBoolVar(conf, ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED))) {
+ return TxnType.SOFT_DELETE;
+ }
return TxnType.DEFAULT;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 96c60fe..e3b0997 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -58,6 +58,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
@@ -1530,7 +1531,7 @@ public class Hive {
final String metaTableName, boolean throwException) throws HiveException {
return this.getTable(dbName, tableName, metaTableName, throwException, false);
}
-
+
/**
* Returns metadata of the table
*
@@ -3804,6 +3805,15 @@ private void constructOneLBLocationMap(FileStatus fSta,
PartitionDropOptions dropOptions) throws HiveException {
try {
Table table = getTable(dbName, tableName);
+ if (!dropOptions.deleteData) {
+ AcidUtils.TableSnapshot snapshot = AcidUtils.getTableSnapshot(conf, table, true);
+ if (snapshot != null) {
+ dropOptions.setWriteId(snapshot.getWriteId());
+ }
+ long txnId = Optional.ofNullable(SessionState.get())
+ .map(ss -> ss.getTxnMgr().getCurrentTxnId()).orElse(0L);
+ dropOptions.setTxnId(txnId);
+ }
List<org.apache.hadoop.hive.metastore.api.Partition> partitions = getMSC().dropPartitions(dbName, tableName,
partitionExpressions, dropOptions);
return convertFromMetastore(table, partitions);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 38f4fec..221e99c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -19,13 +19,10 @@ package org.apache.hadoop.hive.ql.txn.compactor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.StringableMap;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
@@ -35,6 +32,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
@@ -42,11 +40,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.security.UserGroupInformation;
@@ -58,10 +51,13 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_CLEANER_THREAD_NAME_FORMAT;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
@@ -137,8 +133,8 @@ public class Cleaner extends MetaStoreCompactorThread {
// when min_history_level is finally dropped, than every HMS will commit compaction the new way
// and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead.
for (CompactionInfo compactionInfo : readyToClean) {
- cleanerList.add(CompletableFuture.runAsync(CompactorUtil.ThrowingRunnable.unchecked(() ->
- clean(compactionInfo, cleanerWaterMark, metricsEnabled)), cleanerExecutor));
+ cleanerList.add(CompletableFuture.runAsync(ThrowingRunnable.unchecked(
+ () -> clean(compactionInfo, cleanerWaterMark, metricsEnabled)), cleanerExecutor));
}
CompletableFuture.allOf(cleanerList.toArray(new CompletableFuture[0])).join();
}
@@ -184,91 +180,58 @@ public class Cleaner extends MetaStoreCompactorThread {
if (metricsEnabled) {
perfLogger.perfLogBegin(CLASS_NAME, cleanerMetric);
}
+ String location = Optional.ofNullable(ci.properties).map(StringableMap::new)
+ .map(config -> config.get("location")).orElse(null);
+
+ Callable<Boolean> cleanUpTask;
Table t = resolveTable(ci);
- if (t == null) {
- // The table was dropped before we got around to cleaning it.
- LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." +
- idWatermark(ci));
- txnHandler.markCleaned(ci);
- return;
- }
- if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
- // The table was marked no clean up true.
- LOG.info("Skipping table " + ci.getFullTableName() + " clean up, as NO_CLEANUP set to true");
- txnHandler.markCleaned(ci);
- return;
- }
+ Partition p = resolvePartition(ci);
- Partition p = null;
- if (ci.partName != null) {
- p = resolvePartition(ci);
- if (p == null) {
- // The partition was dropped before we got around to cleaning it.
- LOG.info("Unable to find partition " + ci.getFullPartitionName() +
- ", assuming it was dropped." + idWatermark(ci));
+ if (location == null) {
+ if (t == null) {
+ // The table was dropped before we got around to cleaning it.
+ LOG.info("Unable to find table " + ci.getFullTableName() + ", assuming it was dropped." +
+ idWatermark(ci));
txnHandler.markCleaned(ci);
return;
}
- if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
- // The partition was marked no clean up true.
- LOG.info("Skipping partition " + ci.getFullPartitionName() + " clean up, as NO_CLEANUP set to true");
+ if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
+ // The table was marked no clean up true.
+ LOG.info("Skipping table " + ci.getFullTableName() + " clean up, as NO_CLEANUP set to true");
txnHandler.markCleaned(ci);
return;
}
+ if (ci.partName != null) {
+ if (p == null) {
+ // The partition was dropped before we got around to cleaning it.
+ LOG.info("Unable to find partition " + ci.getFullPartitionName() +
+ ", assuming it was dropped." + idWatermark(ci));
+ txnHandler.markCleaned(ci);
+ return;
+ }
+ if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
+ // The partition was marked no clean up true.
+ LOG.info("Skipping partition " + ci.getFullPartitionName() + " clean up, as NO_CLEANUP set to true");
+ txnHandler.markCleaned(ci);
+ return;
+ }
+ }
}
-
txnHandler.markCleanerStart(ci);
-
+
StorageDescriptor sd = resolveStorageDescriptor(t, p);
- final String location = sd.getLocation();
- ValidTxnList validTxnList =
- TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB);
- //save it so that getAcidState() sees it
- conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
- /**
- * {@code validTxnList} is capped by minOpenTxnGLB so if
- * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta
- * produced by a compactor, that means every reader that could be active right now see it
- * as well. That means if this base/delta shadows some earlier base/delta, the it will be
- * used in favor of any files that it shadows. Thus the shadowed files are safe to delete.
- *
- *
- * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted
- * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
- * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
- * For example given partition P1, txnid:150 starts and sees txnid:149 as open.
- * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved
- * writeId:17. Compactor will produce base_17_c160.
- * Suppose txnid:149 writes delta_18_18
- * to P1 and aborts. Compactor can only remove TXN_COMPONENTS entries
- * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but
- * not visible based on 'validTxnList' capped at minOpenTxn so it will not not be cleaned by
- * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the
- * metadata that says that 18 is aborted.
- * In a slightly different case, whatever txn created delta_18 (and all other txn) may have
- * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove
- * it (since it has nothing but aborted data). But we can't tell which actually happened
- * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID.
- *
- * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up
- * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed
- * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID. This could be
- * useful if there is all of a sudden a flood of aborted txns. (For another day).
- */
-
- // Creating 'reader' list since we are interested in the set of 'obsolete' files
- final ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, t, validTxnList);
- LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+ cleanUpTask = () -> removeFiles(Optional.ofNullable(location).orElse(sd.getLocation()),
+ minOpenTxnGLB, ci, ci.partName != null && p == null);
Ref<Boolean> removedFiles = Ref.from(false);
if (runJobAsSelf(ci.runAs)) {
- removedFiles.value = removeFiles(location, validWriteIdList, ci);
+ removedFiles.value = cleanUpTask.call();
} else {
LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName());
UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs,
UserGroupInformation.getLoginUser());
- ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
- removedFiles.value = removeFiles(location, validWriteIdList, ci);
+ ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+ removedFiles.value = cleanUpTask.call();
return null;
});
try {
@@ -296,9 +259,9 @@ public class Cleaner extends MetaStoreCompactorThread {
}
}
- private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, Table t, ValidTxnList validTxnList)
+ private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList)
throws NoSuchTxnException, MetaException {
- List<String> tblNames = Collections.singletonList(TableName.getDbTable(t.getDbName(), t.getTableName()));
+ List<String> tblNames = Collections.singletonList(AcidUtils.getFullTableName(ci.dbname, ci.tableName));
GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
request.setValidTxnList(validTxnList.writeToString());
GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
@@ -320,7 +283,7 @@ public class Cleaner extends MetaStoreCompactorThread {
}
private static boolean isDynPartAbort(Table t, CompactionInfo ci) {
- return t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0
+ return Optional.ofNullable(t).map(Table::getPartitionKeys).filter(pk -> pk.size() > 0).isPresent()
&& ci.partName == null;
}
@@ -328,14 +291,89 @@ public class Cleaner extends MetaStoreCompactorThread {
return " id=" + ci.id;
}
+ private boolean removeFiles(String location, long minOpenTxnGLB, CompactionInfo ci, boolean dropPartition)
+ throws MetaException, IOException, NoSuchObjectException, NoSuchTxnException {
+
+ if (dropPartition) {
+ LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE);
+ LockResponse res = null;
+
+ try {
+ res = txnHandler.lock(lockRequest);
+ if (res.getState() == LockState.ACQUIRED) {
+ if (resolvePartition(ci) == null) {
+ Path path = new Path(location);
+ StringBuilder extraDebugInfo = new StringBuilder("[").append(path.getName()).append(",");
+
+ boolean ifPurge = Optional.ofNullable(ci.properties).map(StringableMap::new)
+ .map(config -> config.get("ifPurge")).map(Boolean::valueOf).orElse(true);
+
+ return remove(location, ci, Collections.singletonList(path), ifPurge,
+ path.getFileSystem(conf), extraDebugInfo);
+ }
+ }
+ } catch (NoSuchTxnException | TxnAbortedException e) {
+ LOG.error(e.getMessage());
+ } finally {
+ if (res != null && res.getState() != LockState.NOT_ACQUIRED) {
+ try {
+ txnHandler.unlock(new UnlockRequest(res.getLockid()));
+ } catch (NoSuchLockException | TxnOpenException e) {
+ LOG.error(e.getMessage());
+ }
+ }
+ }
+ }
+
+ ValidTxnList validTxnList =
+ TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB);
+ //save it so that getAcidState() sees it
+ conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+ /**
+ * {@code validTxnList} is capped by minOpenTxnGLB so if
+ * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} sees a base/delta
+ * produced by a compactor, that means every reader that could be active right now see it
+ * as well. That means if this base/delta shadows some earlier base/delta, the it will be
+ * used in favor of any files that it shadows. Thus the shadowed files are safe to delete.
+ *
+ *
+ * The metadata about aborted writeIds (and consequently aborted txn IDs) cannot be deleted
+ * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
+ * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
+ * For example given partition P1, txnid:150 starts and sees txnid:149 as open.
+ * Say compactor runs in txnid:160, but 149 is still open and P1 has the largest resolved
+ * writeId:17. Compactor will produce base_17_c160.
+ * Suppose txnid:149 writes delta_18_18
+ * to P1 and aborts. Compactor can only remove TXN_COMPONENTS entries
+ * up to (inclusive) writeId:17 since delta_18_18 may be on disk (and perhaps corrupted) but
+ * not visible based on 'validTxnList' capped at minOpenTxn so it will not not be cleaned by
+ * {@link #removeFiles(String, ValidWriteIdList, CompactionInfo)} and so we must keep the
+ * metadata that says that 18 is aborted.
+ * In a slightly different case, whatever txn created delta_18 (and all other txn) may have
+ * committed by the time cleaner runs and so cleaner will indeed see delta_18_18 and remove
+ * it (since it has nothing but aborted data). But we can't tell which actually happened
+ * in markCleaned() so make sure it doesn't delete meta above CG_CQ_HIGHEST_WRITE_ID.
+ *
+ * We could perhaps make cleaning of aborted and obsolete and remove all aborted files up
+ * to the current Min Open Write Id, this way aborted TXN_COMPONENTS meta can be removed
+ * as well up to that point which may be higher than CQ_HIGHEST_WRITE_ID. This could be
+ * useful if there is all of a sudden a flood of aborted txns. (For another day).
+ */
+
+ // Creating 'reader' list since we are interested in the set of 'obsolete' files
+ ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList);
+ LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+
+ return removeFiles(location, validWriteIdList, ci);
+ }
/**
* @return true if any files were removed
*/
private boolean removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci)
throws IOException, NoSuchObjectException, MetaException {
- Path locPath = new Path(location);
- AcidDirectory dir = AcidUtils.getAcidState(locPath.getFileSystem(conf), locPath, conf, writeIdList, Ref.from(
- false), false);
+ Path path = new Path(location);
+ FileSystem fs = path.getFileSystem(conf);
+ AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList, Ref.from(false), false);
List<Path> obsoleteDirs = dir.getObsolete();
/**
* add anything in 'dir' that only has data from aborted transactions - no one should be
@@ -356,16 +394,15 @@ public class Cleaner extends MetaStoreCompactorThread {
// Including obsolete directories for partitioned tables can result in data loss.
obsoleteDirs = dir.getAbortedDirectories();
}
- List<Path> filesToDelete = new ArrayList<>(obsoleteDirs.size());
+ StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
+ .map(Path::getName).collect(Collectors.joining(",")));
+ return remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
+ }
- StringBuilder extraDebugInfo = new StringBuilder("[");
- for (Path stat : obsoleteDirs) {
- filesToDelete.add(stat);
- extraDebugInfo.append(stat.getName()).append(",");
- if (!FileUtils.isPathWithinSubtree(stat, locPath)) {
- LOG.info(idWatermark(ci) + " found unexpected file: " + stat);
- }
- }
+ private boolean remove(String location, CompactionInfo ci, List<Path> filesToDelete, boolean ifPurge,
+ FileSystem fs, StringBuilder extraDebugInfo)
+ throws NoSuchObjectException, MetaException, IOException {
+
extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']');
LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() +
" obsolete directories from " + location + ". " + extraDebugInfo.toString());
@@ -374,16 +411,15 @@ public class Cleaner extends MetaStoreCompactorThread {
", that hardly seems right.");
return false;
}
-
- FileSystem fs = dir.getFs();
Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname);
-
+ boolean needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
+
for (Path dead : filesToDelete) {
LOG.debug("Going to delete path " + dead.toString());
- if (ReplChangeManager.shouldEnableCm(db, table)) {
- replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, true);
+ if (needCmRecycle) {
+ replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, ifPurge);
}
- fs.delete(dead, true);
+ FileUtils.moveToTrash(fs, dead, conf, ifPurge);
}
return true;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index 7ee8f8b..9b5affa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -20,28 +20,26 @@ package org.apache.hadoop.hive.ql.txn.compactor;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+
import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
+
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -148,74 +146,12 @@ public abstract class CompactorThread extends Thread implements Configurable {
protected StorageDescriptor resolveStorageDescriptor(Table t, Partition p) {
return (p == null) ? t.getSd() : p.getSd();
}
-
- /**
- * Determine which user to run an operation as. If metastore.compactor.run.as.user is set, that user will be
- * returned; if not: the the owner of the directory to be compacted.
- * It is asserted that either the user running the hive metastore or the table
- * owner must be able to stat the directory and determine the owner.
- * @param location directory that will be read or written to.
- * @param t metastore table object
- * @return metastore.compactor.run.as.user value; or if that is not set: username of the owner of the location.
- * @throws java.io.IOException if neither the hive metastore user nor the table owner can stat
- * the location.
- */
- protected String findUserToRunAs(String location, Table t) throws IOException,
- InterruptedException {
- LOG.debug("Determining who to run the job as.");
-
- // check if a specific user is set in config
- String runUserAs = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.COMPACTOR_RUN_AS_USER);
- if (runUserAs != null && !"".equals(runUserAs)) {
- return runUserAs;
- }
-
- // get table directory owner
- final Path p = new Path(location);
- final FileSystem fs = p.getFileSystem(conf);
- try {
- FileStatus stat = fs.getFileStatus(p);
- LOG.debug("Running job as " + stat.getOwner());
- return stat.getOwner();
- } catch (AccessControlException e) {
- // TODO not sure this is the right exception
- LOG.debug("Unable to stat file as current user, trying as table owner");
-
- // Now, try it as the table owner and see if we get better luck.
- final List<String> wrapper = new ArrayList<>(1);
- UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
- UserGroupInformation.getLoginUser());
- ugi.doAs(new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws Exception {
- // need to use a new filesystem object here to have the correct ugi
- FileSystem proxyFs = p.getFileSystem(conf);
- FileStatus stat = proxyFs.getFileStatus(p);
- wrapper.add(stat.getOwner());
- return null;
- }
- });
- try {
- FileSystem.closeAllForUGI(ugi);
- } catch (IOException exception) {
- LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception);
- }
-
- if (wrapper.size() == 1) {
- LOG.debug("Running job as " + wrapper.get(0));
- return wrapper.get(0);
- }
- }
- LOG.error("Unable to stat file " + p + " as either current user(" + UserGroupInformation.getLoginUser() +
- ") or table owner(" + t.getOwner() + "), giving up");
- throw new IOException("Unable to stat file: " + p);
- }
-
+
/**
* Determine whether to run this job as the current user or whether we need a doAs to switch
* users.
* @param owner of the directory we will be working in, as determined by
- * {@link #findUserToRunAs(String, org.apache.hadoop.hive.metastore.api.Table)}
+ * {@link TxnUtils#findUserToRunAs(String, org.apache.hadoop.hive.metastore.api.Table)}
* @return true if the job should run as the current user, false if a doAs is needed.
*/
protected boolean runJobAsSelf(String owner) {
@@ -250,4 +186,27 @@ public abstract class CompactorThread extends Thread implements Configurable {
protected String getRuntimeVersion() {
return this.getClass().getPackage().getImplementationVersion();
}
+
+ protected LockRequest createLockRequest(CompactionInfo ci, long txnId, LockType lockType, DataOperationType opType) {
+ String agentInfo = Thread.currentThread().getName();
+ LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
+ requestBuilder.setUser(ci.runAs);
+ requestBuilder.setTransactionId(txnId);
+
+ LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+ .setLock(lockType)
+ .setOperationType(opType)
+ .setDbName(ci.dbname)
+ .setTableName(ci.tableName)
+ .setIsTransactional(true);
+
+ if (ci.partName != null) {
+ lockCompBuilder.setPartitionName(ci.partName);
+ }
+ requestBuilder.addLockComponent(lockCompBuilder.build());
+
+ requestBuilder.setZeroWaitReadEnabled(!conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) ||
+ !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
+ return requestBuilder.build();
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index bffa773..071cbbe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -262,7 +262,7 @@ public class Initiator extends MetaStoreCompactorThread {
String user = cache.get(fullTableName);
if (user == null) {
- user = findUserToRunAs(sd.getLocation(), t);
+ user = TxnUtils.findUserToRunAs(sd.getLocation(), t, conf);
cache.put(fullTableName, user);
}
return user;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index a90e307..08910ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -27,12 +27,11 @@ import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.MetaStoreThread;
-import org.apache.hadoop.hive.metastore.LockComponentBuilder;
-import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest;
import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -438,7 +437,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
}
if (ci.runAs == null) {
- ci.runAs = findUserToRunAs(sd.getLocation(), t);
+ ci.runAs = TxnUtils.findUserToRunAs(sd.getLocation(), t, conf);
}
checkInterrupt();
@@ -560,29 +559,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
return true;
}
- private LockRequest createLockRequest(CompactionInfo ci, long txnId) {
- String agentInfo = Thread.currentThread().getName();
- LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
- requestBuilder.setUser(ci.runAs);
- requestBuilder.setTransactionId(txnId);
-
- LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
- .setSharedRead()
- .setOperationType(DataOperationType.SELECT)
- .setDbName(ci.dbname)
- .setTableName(ci.tableName)
- .setIsTransactional(true);
-
- if (ci.partName != null) {
- lockCompBuilder.setPartitionName(ci.partName);
- }
- requestBuilder.addLockComponent(lockCompBuilder.build());
-
- requestBuilder.setZeroWaitReadEnabled(!conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) ||
- !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
- return requestBuilder.build();
- }
-
/**
* Just AcidUtils.getAcidState, but with impersonation if needed.
*/
@@ -719,7 +695,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
this.txnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
status = TxnStatus.OPEN;
- LockRequest lockRequest = createLockRequest(ci, txnId);
+ LockRequest lockRequest = createLockRequest(ci, txnId, LockType.SHARED_READ, DataOperationType.SELECT);
LockResponse res = msc.lock(lockRequest);
if (res.getState() != LockState.ACQUIRED) {
throw new TException("Unable to acquire lock(s) on {" + ci.getFullPartitionName()
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 5b18b8d..ddc7530 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -107,6 +107,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
//TestTxnCommandsWithSplitUpdateAndVectorization has the vectorized version
//of these tests.
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+ HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, false);
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_TRUNCATE_USE_BASE, false);
}
@@ -975,7 +976,7 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
}
}
Assert.assertNotNull(txnInfo);
- Assert.assertEquals(14, txnInfo.getId());
+ Assert.assertEquals(16, txnInfo.getId());
Assert.assertEquals(TxnState.OPEN, txnInfo.getState());
String s = TestTxnDbUtil
.queryToString(hiveConf, "select TXN_STARTED, TXN_LAST_HEARTBEAT from TXNS where TXN_ID = " + txnInfo.getId(), false);
@@ -1402,10 +1403,10 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
query = "select ROW__ID, a, b" + (isVectorized ? "" : ", INPUT__FILE__NAME") + " from "
+ Table.NONACIDORCTBL + " order by ROW__ID";
String[][] expected2 = new String[][] {
- {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/base_10000001_v0000019/bucket_00001"},
- {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/base_10000001_v0000019/bucket_00001"},
- {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/base_10000001_v0000019/bucket_00001"},
- {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/base_10000001_v0000019/bucket_00001"}
+ {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "nonacidorctbl/base_10000001_v0000021/bucket_00001"},
+ {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":1}\t0\t12", "nonacidorctbl/base_10000001_v0000021/bucket_00001"},
+ {"{\"writeid\":0,\"bucketid\":536936448,\"rowid\":2}\t1\t5", "nonacidorctbl/base_10000001_v0000021/bucket_00001"},
+ {"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t1\t17", "nonacidorctbl/base_10000001_v0000021/bucket_00001"}
};
checkResult(expected2, query, isVectorized, "after major compact", LOG);
//make sure they are the same before and after compaction
@@ -1531,8 +1532,8 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
runStatementOnDriver("truncate table " + Table.ACIDTBL);
FileSystem fs = FileSystem.get(hiveConf);
- FileStatus[] stat =
- fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBL.toString().toLowerCase()), AcidUtils.baseFileFilter);
+ FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBL.toString().toLowerCase()),
+ AcidUtils.baseFileFilter);
if (1 != stat.length) {
Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
}
@@ -1552,8 +1553,8 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
runStatementOnDriver("truncate table " + Table.ACIDTBLPART);
FileSystem fs = FileSystem.get(hiveConf);
- FileStatus[] stat =
- fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"), AcidUtils.baseFileFilter);
+ FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"),
+ AcidUtils.baseFileFilter);
if (1 != stat.length) {
Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
}
@@ -1573,15 +1574,15 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
runStatementOnDriver("truncate table " + Table.ACIDTBLPART + " partition(p='b')");
FileSystem fs = FileSystem.get(hiveConf);
- FileStatus[] stat =
- fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=b"), AcidUtils.baseFileFilter);
+ FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=b"),
+ AcidUtils.baseFileFilter);
if (1 != stat.length) {
Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
}
String name = stat[0].getPath().getName();
Assert.assertEquals("base_0000003", name);
- stat =
- fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"), AcidUtils.deltaFileFilter);
+ stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"),
+ AcidUtils.deltaFileFilter);
if (1 != stat.length) {
Assert.fail("Expecting 1 delta and found " + stat.length + " files " + Arrays.toString(stat));
}
@@ -1589,4 +1590,94 @@ public class TestTxnCommands extends TxnCommandsBaseForTests {
List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLPART);
Assert.assertEquals(2, r.size());
}
+
+ @Test
+ public void testDropWithBaseOnePartition() throws Exception {
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='a') values (1,2),(3,4)");
+ runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition (p='b') values (5,5),(4,4)");
+
+ HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, true);
+ runStatementOnDriver("alter table " + Table.ACIDTBLPART + " drop partition (p='b')");
+
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=b"),
+ AcidUtils.baseFileFilter);
+ if (1 != stat.length) {
+ Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
+ }
+ String name = stat[0].getPath().getName();
+ Assert.assertEquals("base_0000003", name);
+ stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase() + "/p=a"),
+ AcidUtils.baseFileFilter);
+ if (0 != stat.length) {
+ Assert.fail("Expecting no base and found " + stat.length + " files " + Arrays.toString(stat));
+ }
+
+ List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLPART);
+ Assert.assertEquals(2, r.size());
+
+ TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+ ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
+
+ Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
+ Assert.assertTrue(resp.getCompacts().stream().anyMatch(
+ ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && "p=b".equals(ci.getPartitionname())));
+
+ runCleaner(hiveConf);
+
+ stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLPART.toString().toLowerCase()),
+ path -> path.getName().equals("p=b"));
+ if (0 != stat.length) {
+ Assert.fail("Expecting partition data to be removed from FS");
+ }
+ }
+
+ @Test
+ public void testDropWithBaseMultiplePartitions() throws Exception {
+ runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='a', p3='a') values (1,1),(2,2)");
+ runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='a', p3='b') values (3,3),(4,4)");
+ runStatementOnDriver("insert into " + Table.ACIDTBLNESTEDPART + " partition (p1='a', p2='b', p3='c') values (7,7),(8,8)");
+
+ HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, true);
+ runStatementOnDriver("alter table " + Table.ACIDTBLNESTEDPART + " drop partition (p2='a')");
+
+ TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+ ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize());
+
+ FileSystem fs = FileSystem.get(hiveConf);
+ FileStatus[] stat;
+
+ for (char p : Arrays.asList('a', 'b')) {
+ String partName = "p1=a/p2=a/p3=" + p;
+ Assert.assertTrue(resp.getCompacts().stream().anyMatch(
+ ci -> TxnStore.CLEANING_RESPONSE.equals(ci.getState()) && partName.equals(ci.getPartitionname())));
+
+ stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/" + partName),
+ AcidUtils.baseFileFilter);
+ if (1 != stat.length) {
+ Assert.fail("Expecting 1 base and found " + stat.length + " files " + Arrays.toString(stat));
+ }
+ String name = stat[0].getPath().getName();
+ Assert.assertEquals("base_0000004", name);
+ }
+ stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/p1=a/p2=b/p3=c"),
+ AcidUtils.baseFileFilter);
+ if (0 != stat.length) {
+ Assert.fail("Expecting no base and found " + stat.length + " files " + Arrays.toString(stat));
+ }
+
+ List<String> r = runStatementOnDriver("select * from " + Table.ACIDTBLNESTEDPART);
+ Assert.assertEquals(2, r.size());
+
+ runCleaner(hiveConf);
+
+ for (char p : Arrays.asList('a', 'b')) {
+ stat = fs.listStatus(new Path(getWarehouseDir(), Table.ACIDTBLNESTEDPART.toString().toLowerCase() + "/p1=a/p2=a"),
+ path -> path.getName().equals("p3=" + p));
+ if (0 != stat.length) {
+ Assert.fail("Expecting partition data to be removed from FS");
+ }
+ }
+ }
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index 8345832..64ac224 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -346,7 +346,7 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
├── delta_0000001_0000001_0000
│ ├── _orc_acid_version
│ └── bucket_00000
- ├── delta_0000001_0000002_v0000018
+ ├── delta_0000001_0000002_v0000020
│ ├── _orc_acid_version
│ └── bucket_00000
└── delta_0000002_0000002_0000
@@ -358,7 +358,7 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
FileUtils.HIDDEN_FILES_PATH_FILTER);
String[] expectedList = new String[] {
- "/t/delta_0000001_0000002_v0000018",
+ "/t/delta_0000001_0000002_v0000020",
"/t/delta_0000001_0000001_0000",
"/t/delta_0000002_0000002_0000",
};
@@ -385,14 +385,14 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table T compact 'minor'");
runWorker(hiveConf);
/*
- at this point delta_0000001_0000003_v0000020 is visible to everyone
+ at this point delta_0000001_0000003_v0000022 is visible to everyone
so cleaner removes all files shadowed by it (which is everything in this case)
*/
runCleaner(hiveConf);
runCleaner(hiveConf);
expectedList = new String[] {
- "/t/delta_0000001_0000003_v0000020"
+ "/t/delta_0000001_0000003_v0000022"
};
actualList = fs.listStatus(new Path(warehousePath + "/t"),
FileUtils.HIDDEN_FILES_PATH_FILTER);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
index 5daf5ac..1c16bb9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java
@@ -79,13 +79,13 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
String[][] expected2 = new String[][] {
{"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4",
- "acidtbl/base_0000003_v0000019/bucket_00001"},
+ "acidtbl/base_0000003_v0000021/bucket_00001"},
{"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t4\t4",
- "acidtbl/base_0000003_v0000019/bucket_00001"},
+ "acidtbl/base_0000003_v0000021/bucket_00001"},
{"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
- "acidtbl/base_0000003_v0000019/bucket_00001"},
+ "acidtbl/base_0000003_v0000021/bucket_00001"},
{"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t8\t8",
- "acidtbl/base_0000003_v0000019/bucket_00001"}};
+ "acidtbl/base_0000003_v0000021/bucket_00001"}};
checkResult(expected2, testQuery, false, "check data after concatenate", LOG);
}
@Test
@@ -120,11 +120,11 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
String[][] expected2 = new String[][] {
{"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4",
- "acidtblpart/p=p1/base_0000003_v0000019/bucket_00001"},
+ "acidtblpart/p=p1/base_0000003_v0000021/bucket_00001"},
{"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5",
"acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001_0"},
{"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6",
- "acidtblpart/p=p1/base_0000003_v0000019/bucket_00001"},
+ "acidtblpart/p=p1/base_0000003_v0000021/bucket_00001"},
{"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
"acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001_0"}};
@@ -160,10 +160,10 @@ public class TestTxnConcatenate extends TxnCommandsBaseForTests {
Assert.assertEquals(1, rsp.getCompactsSize());
Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
String[][] expected2 = new String[][] {
- {"1\t2", "t/base_0000003_v0000020/000000_0"},
- {"4\t5", "t/base_0000003_v0000020/000000_0"},
- {"5\t6", "t/base_0000003_v0000020/000000_0"},
- {"8\t8", "t/base_0000003_v0000020/000000_0"}};
+ {"1\t2", "t/base_0000003_v0000022/000000_0"},
+ {"4\t5", "t/base_0000003_v0000022/000000_0"},
+ {"5\t6", "t/base_0000003_v0000022/000000_0"},
+ {"8\t8", "t/base_0000003_v0000022/000000_0"}};
checkResult(expected2, testQuery, false, "check data after concatenate", LOG);
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
index 80948dc..34affbf 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnExIm.java
@@ -346,11 +346,11 @@ target/tmp/org.apache.hadoop.hive.ql.TestTxnCommands-1521148657811/
TestTxnCommands2.runWorker(hiveConf);
String[][] expected3 = new String[][] {
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2",
- ".*t/delta_0000001_0000002_v000002[4-5]/bucket_00000"},
+ ".*t/delta_0000001_0000002_v000002[6-7]/bucket_00000"},
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4",
- ".*t/delta_0000001_0000002_v000002[4-5]/bucket_00000"},
+ ".*t/delta_0000001_0000002_v000002[6-7]/bucket_00000"},
{"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t6",
- ".*t/delta_0000001_0000002_v000002[4-5]/bucket_00000"}};
+ ".*t/delta_0000001_0000002_v000002[6-7]/bucket_00000"}};
checkResult(expected3, testQuery, isVectorized, "minor compact imported table");
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
index 67787a6..9d3714e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnLoadData.java
@@ -139,8 +139,8 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table T compact 'minor'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected3 = new String[][] {
- {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004_v0000032/bucket_00000"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004_v0000032/bucket_00000"}
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/delta_0000001_0000004_v0000034/bucket_00000"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/delta_0000001_0000004_v0000034/bucket_00000"}
};
checkResult(expected3, testQuery, isVectorized, "delete compact minor");
@@ -173,9 +173,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected6 = new String[][]{
- {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009_v0000046/bucket_00000"},
- {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009_v0000046/bucket_00000"},
- {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009_v0000046/bucket_00000"}
+ {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t1\t17", "t/base_0000009_v0000048/bucket_00000"},
+ {"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t1\t17", "t/base_0000009_v0000048/bucket_00000"},
+ {"{\"writeid\":9,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000009_v0000048/bucket_00000"}
};
checkResult(expected6, testQuery, isVectorized, "load data inpath compact major");
}
@@ -210,10 +210,10 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table T compact 'minor'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected1 = new String[][] {
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000002_v0000024/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000002_v0000024/bucket_00000"},
- {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000002_v0000024/bucket_00000"},
- {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000002_v0000024/bucket_00000"}
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/delta_0000001_0000002_v0000026/bucket_00000"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/delta_0000001_0000002_v0000026/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/delta_0000001_0000002_v0000026/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/delta_0000001_0000002_v0000026/bucket_00000"}
};
checkResult(expected1, testQuery, isVectorized, "load data inpath (minor)");
@@ -222,11 +222,11 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected2 = new String[][] {
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000003_v0000028/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000003_v0000028/bucket_00000"},
- {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000003_v0000028/bucket_00000"},
- {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000003_v0000028/bucket_00000"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000003_v0000028/bucket_00000"}
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t0\t2", "t/base_0000003_v0000030/bucket_00000"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t0\t4", "t/base_0000003_v0000030/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000003_v0000030/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000003_v0000030/bucket_00000"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t2\t2", "t/base_0000003_v0000030/bucket_00000"}
};
checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
@@ -244,9 +244,9 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected4 = new String[][] {
- {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000005_v0000037/bucket_00000"},
- {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000005_v0000037/bucket_00000"},
- {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000005_v0000037/bucket_00000"}};
+ {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t5\t6", "t/base_0000005_v0000039/bucket_00000"},
+ {"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":1}\t7\t8", "t/base_0000005_v0000039/bucket_00000"},
+ {"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t6\t6", "t/base_0000005_v0000039/bucket_00000"}};
checkResult(expected4, testQuery, isVectorized, "load data inpath overwrite (major)");
}
/**
@@ -325,13 +325,13 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
String[][] expected3 = new String[][] {
{"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
- "t/base_10000003_v0000034/bucket_00000"},
+ "t/base_10000003_v0000036/bucket_00000"},
{"{\"writeid\":10000002,\"bucketid\":536870912,\"rowid\":1}\t7\t8",
- "t/base_10000003_v0000034/bucket_00000"},
+ "t/base_10000003_v0000036/bucket_00000"},
{"{\"writeid\":10000002,\"bucketid\":536936448,\"rowid\":0}\t8\t8",
- "t/base_10000003_v0000034/bucket_00001"},
+ "t/base_10000003_v0000036/bucket_00001"},
{"{\"writeid\":10000003,\"bucketid\":536870912,\"rowid\":0}\t9\t9",
- "t/base_10000003_v0000034/bucket_00000"}
+ "t/base_10000003_v0000036/bucket_00000"}
};
checkResult(expected3, testQuery, isVectorized, "load data inpath overwrite (major)");
}
@@ -454,10 +454,10 @@ public class TestTxnLoadData extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table T compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
String[][] expected2 = new String[][] {
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000001_v0000022/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000001_v0000022/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000001_v0000022/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000001_v0000022/bucket_00000"}
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2", "t/base_0000001_v0000024/bucket_00000"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t4", "t/base_0000001_v0000024/bucket_00000"},
+ {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":0}\t5\t5", "t/base_0000001_v0000024/bucket_00000"},
+ {"{\"writeid\":1,\"bucketid\":536870913,\"rowid\":1}\t6\t6", "t/base_0000001_v0000024/bucket_00000"}
};
checkResult(expected2, testQuery, isVectorized, "load data inpath (major)");
//at lest for now, Load Data w/Overwrite is not allowed in a txn: HIVE-18154
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
index da670c8..2aa8840 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnNoBuckets.java
@@ -168,10 +168,10 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
*/
String expected[][] = {
- {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00000"},
- {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00001"},
- {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00001"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00000"}
+ {"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t0\t0\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00000"},
+ {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t1\t17", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00001"},
+ {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t2\t2\t2", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00001"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t3\t3\t3", NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00000"}
};
checkResult(expected,
"select ROW__ID, c1, c2, c3" + (shouldVectorize() ? "" : ", INPUT__FILE__NAME")
@@ -186,8 +186,8 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000001_0000001_0000/bucket_00001_0");
expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00000_0");
expectedFiles.add(NO_BUCKETS_TBL_NAME + "/delta_0000002_0000002_0000/bucket_00001_0");
- expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00000");
- expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00001");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00000");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00001");
assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/" + NO_BUCKETS_TBL_NAME, NO_BUCKETS_TBL_NAME);
TestTxnCommands2.runCleaner(hiveConf);
@@ -196,8 +196,8 @@ public class TestTxnNoBuckets extends TxnCommandsBaseForTests {
Assert.assertEquals("Unexpected result after clean", stringifyValues(result), rs);
expectedFiles.clear();
- expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00000");
- expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000024/bucket_00001");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00000");
+ expectedFiles.add(NO_BUCKETS_TBL_NAME + "/base_0000002_v0000026/bucket_00001");
assertExpectedFileSet(expectedFiles, getWarehouseDir() + "/" + NO_BUCKETS_TBL_NAME, NO_BUCKETS_TBL_NAME);
}
@@ -478,23 +478,23 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
/*Compaction preserves location of rows wrt buckets/tranches (for now)*/
String expected4[][] = {
{"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":0}\t1\t2",
- "warehouse/t/base_10000002_v0000028/bucket_00002"},
+ "warehouse/t/base_10000002_v0000030/bucket_00002"},
{"{\"writeid\":0,\"bucketid\":537001984,\"rowid\":1}\t2\t4",
- "warehouse/t/base_10000002_v0000028/bucket_00002"},
+ "warehouse/t/base_10000002_v0000030/bucket_00002"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":0}\t5\t6",
- "warehouse/t/base_10000002_v0000028/bucket_00000"},
+ "warehouse/t/base_10000002_v0000030/bucket_00000"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t9\t10",
- "warehouse/t/base_10000002_v0000028/bucket_00000"},
+ "warehouse/t/base_10000002_v0000030/bucket_00000"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t10\t20",
- "warehouse/t/base_10000002_v0000028/bucket_00000"},
+ "warehouse/t/base_10000002_v0000030/bucket_00000"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t12\t12",
- "warehouse/t/base_10000002_v0000028/bucket_00000"},
+ "warehouse/t/base_10000002_v0000030/bucket_00000"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t20\t40",
- "warehouse/t/base_10000002_v0000028/bucket_00000"},
+ "warehouse/t/base_10000002_v0000030/bucket_00000"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":5}\t50\t60",
- "warehouse/t/base_10000002_v0000028/bucket_00000"},
+ "warehouse/t/base_10000002_v0000030/bucket_00000"},
{"{\"writeid\":10000001,\"bucketid\":536936448,\"rowid\":0}\t60\t88",
- "warehouse/t/base_10000002_v0000028/bucket_00001"},
+ "warehouse/t/base_10000002_v0000030/bucket_00001"},
};
checkExpected(rs, expected4,"after major compact");
}
@@ -750,15 +750,15 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
rs = runStatementOnDriver(query);
String[][] expected5 = {//the row__ids are the same after compaction
{"{\"writeid\":10000001,\"bucketid\":536870912,\"rowid\":0}\t1\t17",
- "warehouse/t/base_10000001_v0000028/bucket_00000"},
+ "warehouse/t/base_10000001_v0000030/bucket_00000"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":1}\t2\t4",
- "warehouse/t/base_10000001_v0000028/bucket_00000"},
+ "warehouse/t/base_10000001_v0000030/bucket_00000"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":2}\t5\t6",
- "warehouse/t/base_10000001_v0000028/bucket_00000"},
+ "warehouse/t/base_10000001_v0000030/bucket_00000"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":3}\t6\t8",
- "warehouse/t/base_10000001_v0000028/bucket_00000"},
+ "warehouse/t/base_10000001_v0000030/bucket_00000"},
{"{\"writeid\":0,\"bucketid\":536870912,\"rowid\":4}\t9\t10",
- "warehouse/t/base_10000001_v0000028/bucket_00000"}
+ "warehouse/t/base_10000001_v0000030/bucket_00000"}
};
checkExpected(rs, expected5, "After major compaction");
//vectorized because there is INPUT__FILE__NAME
@@ -820,10 +820,10 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree /Users/ekoifman/dev/hiver
{"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t4\t3", "t/p=1/q=1/delta_0000001_0000001_0000/bucket_00000_0"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t1\t5\t1", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"},
{"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t1\t5\t3", "t/p=1/q=1/delta_0000003_0000003_0000/bucket_00000_0"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t4\t2", "t/p=1/q=2/base_0000003_v0000019/bucket_00000"},
- {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4\t4", "t/p=1/q=2/base_0000003_v0000019/bucket_00000"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t5\t2", "t/p=1/q=2/base_0000003_v0000019/bucket_00000"},
- {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t5\t4", "t/p=1/q=2/base_0000003_v0000019/bucket_00000"}
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t4\t2", "t/p=1/q=2/base_0000003_v0000021/bucket_00000"},
+ {"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t4\t4", "t/p=1/q=2/base_0000003_v0000021/bucket_00000"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t1\t2\t5\t2", "t/p=1/q=2/base_0000003_v0000021/bucket_00000"},
+ {"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":1}\t1\t2\t5\t4", "t/p=1/q=2/base_0000003_v0000021/bucket_00000"}
};
checkExpected(rs, expected2, "after major compaction");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index e86e2d5..05e45f5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -72,6 +72,7 @@ public abstract class TxnCommandsBaseForTests {
public enum Table {
ACIDTBL("acidTbl"),
ACIDTBLPART("acidTblPart"),
+ ACIDTBLNESTEDPART("acidTblNestedPart"),
ACIDTBL2("acidTbl2"),
NONACIDORCTBL("nonAcidOrcTbl"),
NONACIDORCTBL2("nonAcidOrcTbl2"),
@@ -147,6 +148,7 @@ public abstract class TxnCommandsBaseForTests {
protected void setUpSchema() throws Exception {
runStatementOnDriver("create table " + Table.ACIDTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("create table " + Table.ACIDTBLPART + "(a int, b int) partitioned by (p string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
+ runStatementOnDriver("create table " + Table.ACIDTBLNESTEDPART + "(a int, b int) partitioned by (p1 string, p2 string, p3 string) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
runStatementOnDriver("create table " + Table.NONACIDORCTBL + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
runStatementOnDriver("create table " + Table.NONACIDORCTBL2 + "(a int, b int) clustered by (a) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='false')");
runStatementOnDriver("create temporary table " + Table.ACIDTBL2 + "(a int, b int, c int) clustered by (c) into " + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
index 12e2348..9eb9e1b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hive.ql.lockmgr;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClientWithLocalCache;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -32,11 +34,18 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
+import java.io.File;
+
/**
* Base class for "end-to-end" tests for DbTxnManager and simulate concurrent queries.
*/
public abstract class DbTxnManagerEndToEndTestBase {
+ private static final String TEST_DATA_DIR = new File(
+ System.getProperty("java.io.tmpdir") + File.separator +
+ DbTxnManagerEndToEndTestBase.class.getCanonicalName() + "-" + System.currentTimeMillis())
+ .getPath().replaceAll("\\\\", "/");
+
protected static HiveConf conf = new HiveConf(Driver.class);
protected HiveTxnManager txnMgr;
protected Context ctx;
@@ -44,10 +53,12 @@ public abstract class DbTxnManagerEndToEndTestBase {
protected TxnStore txnHandler;
public DbTxnManagerEndToEndTestBase() {
- conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
- "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
- conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
- conf.setBoolVar(HiveConf.ConfVars.TXN_MERGE_INSERT_X_LOCK, true);
+ HiveConf.setVar(conf, HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_MERGE_INSERT_X_LOCK, true);
+
+ MetastoreConf.setVar(conf, MetastoreConf.ConfVars.WAREHOUSE, getWarehouseDir());
TestTxnDbUtil.setConfValues(conf);
}
@BeforeClass
@@ -58,15 +69,19 @@ public abstract class DbTxnManagerEndToEndTestBase {
@Before
public void setUp() throws Exception {
// set up metastore client cache
- if (conf.getBoolVar(HiveConf.ConfVars.MSC_CACHE_ENABLED)) {
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.MSC_CACHE_ENABLED)) {
HiveMetaStoreClientWithLocalCache.init(conf);
}
SessionState.start(conf);
ctx = new Context(conf);
driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build());
driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build());
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_LOCKS_PARTITION_THRESHOLD, -1);
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, false);
HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);
+
TestTxnDbUtil.cleanDb(conf);
SessionState ss = SessionState.get();
ss.initTxnMgr(conf);
@@ -74,7 +89,15 @@ public abstract class DbTxnManagerEndToEndTestBase {
Assert.assertTrue(txnMgr instanceof DbTxnManager);
txnHandler = TxnUtils.getTxnStore(conf);
+ File f = new File(getWarehouseDir());
+ if (f.exists()) {
+ FileUtil.fullyDelete(f);
+ }
+ if (!(new File(getWarehouseDir()).mkdirs())) {
+ throw new RuntimeException("Could not create " + getWarehouseDir());
+ }
}
+
@After
public void tearDown() throws Exception {
driver.close();
@@ -82,5 +105,10 @@ public abstract class DbTxnManagerEndToEndTestBase {
if (txnMgr != null) {
txnMgr.closeTxnManager();
}
+ FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
+ }
+
+ protected String getWarehouseDir() {
+ return TEST_DATA_DIR + "/warehouse";
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index a8cb6aa..c3db57d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.ql.lockmgr;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -37,6 +40,7 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.junit.Assert;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -48,6 +52,8 @@ import org.junit.ComparisonFailure;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.FieldSetter;
import java.util.ArrayList;
import java.util.Arrays;
@@ -3361,4 +3367,98 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
Assert.assertEquals("1\t2", res.get(0));
}
+ @Test
+ public void testDropPartitionNonBlocking() throws Exception {
+ testDropPartition(false);
+ }
+ @Test
+ public void testDropPartitionBlocking() throws Exception {
+ testDropPartition(true);
+ }
+
+ private void testDropPartition(boolean blocking) throws Exception {
+ dropTable(new String[] {"tab_acid"});
+ FileSystem fs = FileSystem.get(conf);
+
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, !blocking);
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_LOCKS_PARTITION_THRESHOLD, 1);
+ driver = Mockito.spy(driver);
+
+ HiveConf.setBoolVar(driver2.getConf(), HiveConf.ConfVars.HIVE_ACID_DROP_PARTITION_USE_BASE, !blocking);
+ driver2 = Mockito.spy(driver2);
+
+ driver.run("create table if not exists tab_acid (a int, b int) partitioned by (p string) " +
+ "stored as orc TBLPROPERTIES ('transactional'='true')");
+ driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo'),(3,4,'bar')");
+
+ driver.compileAndRespond("select * from tab_acid");
+ List<String> res = new ArrayList<>();
+
+ driver.lockAndRespond();
+ List<ShowLocksResponseElement> locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 1, locks.size());
+
+ checkLock(LockType.SHARED_READ,
+ LockState.ACQUIRED, "default", "tab_acid", null, locks);
+
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
+ driver2.compileAndRespond("alter table tab_acid drop partition (p='foo')");
+
+ if (blocking) {
+ txnMgr2.acquireLocks(driver2.getPlan(), ctx, null, false);
+ locks = getLocks();
+
+ ShowLocksResponseElement checkLock = checkLock(LockType.EXCLUSIVE,
+ LockState.WAITING, "default", "tab_acid", "p=foo", locks);
+
+ swapTxnManager(txnMgr);
+ Mockito.doNothing().when(driver).lockAndRespond();
+ driver.run();
+
+ driver.getFetchTask().fetch(res);
+ swapTxnManager(txnMgr2);
+
+ FieldSetter.setField(txnMgr2, txnMgr2.getClass().getDeclaredField("numStatements"), 0);
+ txnMgr2.getMS().unlock(checkLock.getLockid());
+ }
+ driver2.lockAndRespond();
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", blocking ? 1 : 2, locks.size());
+
+ checkLock(blocking ? LockType.EXCLUSIVE : LockType.EXCL_WRITE,
+ LockState.ACQUIRED, "default", "tab_acid", "p=foo", locks);
+
+ Mockito.doNothing().when(driver2).lockAndRespond();
+ driver2.run();
+
+ if (!blocking) {
+ swapTxnManager(txnMgr);
+ Mockito.doNothing().when(driver).lockAndRespond();
+ driver.run();
+ }
+ Mockito.reset(driver, driver2);
+
+ FileStatus[] stat = fs.listStatus(new Path(getWarehouseDir(), "tab_acid" + (blocking ? "" : "/p=foo")),
+ (blocking ? path -> path.getName().equals("p=foo") : AcidUtils.baseFileFilter));
+ if ((blocking ? 0 : 1) != stat.length) {
+ Assert.fail("Partition data was " + (blocking ? "not " : "") + "removed from FS");
+ }
+ driver.getFetchTask().fetch(res);
+ Assert.assertEquals("Expecting 2 rows and found " + res.size(), 2, res.size());
+
+ driver.run("select * from tab_acid where p='foo'");
+ res = new ArrayList<>();
+ driver.getFetchTask().fetch(res);
+ Assert.assertEquals("Expecting 0 rows and found " + res.size(), 0, res.size());
+
+ //re-create partition with the same name
+ driver.run("insert into tab_acid partition(p) (a,b,p) values(1,2,'foo')");
+
+ driver.run("select * from tab_acid where p='foo'");
+ res = new ArrayList<>();
+ driver.getFetchTask().fetch(res);
+ Assert.assertEquals("Expecting 1 rows and found " + res.size(), 1, res.size());
+ }
+
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index ca8e1e0..33871fa 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -899,7 +900,7 @@ public class TestInitiator extends CompactorTest {
}
/**
- * Tests org.apache.hadoop.hive.ql.txn.compactor.CompactorThread#findUserToRunAs(java.lang.String, org.apache.hadoop
+ * Tests org.apache.hadoop.hive.metastore.txn.#findUserToRunAs(java.lang.String, org.apache.hadoop
* .hive.metastore.api.Table).
* Used by Worker and Initiator.
* Initiator caches this via Initiator#resolveUserToRunAs.
@@ -917,13 +918,13 @@ public class TestInitiator extends CompactorTest {
// user set in config
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.COMPACTOR_RUN_AS_USER, userFromConf);
initiator.setConf(conf);
- Assert.assertEquals(userFromConf, initiator.findUserToRunAs(t.getSd().getLocation(), t));
+ Assert.assertEquals(userFromConf, TxnUtils.findUserToRunAs(t.getSd().getLocation(), t, conf));
// table dir owner (is probably not "randomUser123")
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.COMPACTOR_RUN_AS_USER, "");
// simulate restarting Initiator
initiator.setConf(conf);
- Assert.assertNotEquals(userFromConf, initiator.findUserToRunAs(t.getSd().getLocation(), t));
+ Assert.assertNotEquals(userFromConf, TxnUtils.findUserToRunAs(t.getSd().getLocation(), t, conf));
}
/**
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index 8ae55a3..3d13a9e 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -615,16 +615,18 @@ int _kTxnTypeValues[] = {
TxnType::REPL_CREATED,
TxnType::READ_ONLY,
TxnType::COMPACTION,
- TxnType::MATER_VIEW_REBUILD
+ TxnType::MATER_VIEW_REBUILD,
+ TxnType::SOFT_DELETE
};
const char* _kTxnTypeNames[] = {
"DEFAULT",
"REPL_CREATED",
"READ_ONLY",
"COMPACTION",
- "MATER_VIEW_REBUILD"
+ "MATER_VIEW_REBUILD",
+ "SOFT_DELETE"
};
-const std::map<int, const char*> _TxnType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(5, _kTxnTypeValues, _kTxnTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr));
+const std::map<int, const char*> _TxnType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kTxnTypeValues, _kTxnTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr));
std::ostream& operator<<(std::ostream& out, const TxnType::type& val) {
std::map<int, const char*>::const_iterator it = _TxnType_VALUES_TO_NAMES.find(val);
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index a0a105c..6f3d42d 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -298,7 +298,8 @@ struct TxnType {
REPL_CREATED = 1,
READ_ONLY = 2,
COMPACTION = 3,
- MATER_VIEW_REBUILD = 4
+ MATER_VIEW_REBUILD = 4,
+ SOFT_DELETE = 5
};
};
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java
index ea2b2fc..39b4248 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java
@@ -13,7 +13,8 @@ public enum TxnType implements org.apache.thrift.TEnum {
REPL_CREATED(1),
READ_ONLY(2),
COMPACTION(3),
- MATER_VIEW_REBUILD(4);
+ MATER_VIEW_REBUILD(4),
+ SOFT_DELETE(5);
private final int value;
@@ -45,6 +46,8 @@ public enum TxnType implements org.apache.thrift.TEnum {
return COMPACTION;
case 4:
return MATER_VIEW_REBUILD;
+ case 5:
+ return SOFT_DELETE;
default:
return null;
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php
index 6c2e294..5179566 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php
@@ -28,12 +28,15 @@ final class TxnType
const MATER_VIEW_REBUILD = 4;
+ const SOFT_DELETE = 5;
+
static public $__names = array(
0 => 'DEFAULT',
1 => 'REPL_CREATED',
2 => 'READ_ONLY',
3 => 'COMPACTION',
4 => 'MATER_VIEW_REBUILD',
+ 5 => 'SOFT_DELETE',
);
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 3e34455..5c15eb1 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -377,6 +377,7 @@ class TxnType(object):
READ_ONLY = 2
COMPACTION = 3
MATER_VIEW_REBUILD = 4
+ SOFT_DELETE = 5
_VALUES_TO_NAMES = {
0: "DEFAULT",
@@ -384,6 +385,7 @@ class TxnType(object):
2: "READ_ONLY",
3: "COMPACTION",
4: "MATER_VIEW_REBUILD",
+ 5: "SOFT_DELETE",
}
_NAMES_TO_VALUES = {
@@ -392,6 +394,7 @@ class TxnType(object):
"READ_ONLY": 2,
"COMPACTION": 3,
"MATER_VIEW_REBUILD": 4,
+ "SOFT_DELETE": 5,
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 8cecd8f..195de51 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -170,8 +170,9 @@ module TxnType
READ_ONLY = 2
COMPACTION = 3
MATER_VIEW_REBUILD = 4
- VALUE_MAP = {0 => "DEFAULT", 1 => "REPL_CREATED", 2 => "READ_ONLY", 3 => "COMPACTION", 4 => "MATER_VIEW_REBUILD"}
- VALID_VALUES = Set.new([DEFAULT, REPL_CREATED, READ_ONLY, COMPACTION, MATER_VIEW_REBUILD]).freeze
+ SOFT_DELETE = 5
+ VALUE_MAP = {0 => "DEFAULT", 1 => "REPL_CREATED", 2 => "READ_ONLY", 3 => "COMPACTION", 4 => "MATER_VIEW_REBUILD", 5 => "SOFT_DELETE"}
+ VALID_VALUES = Set.new([DEFAULT, REPL_CREATED, READ_ONLY, COMPACTION, MATER_VIEW_REBUILD, SOFT_DELETE]).freeze
end
module GetTablesExtRequestFields
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index a077ab1..8e373e4 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -44,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
+import java.util.Optional;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -1759,10 +1760,22 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
req.setDeleteData(options.deleteData);
req.setNeedResult(options.returnResults);
req.setIfExists(options.ifExists);
+
+ EnvironmentContext context = null;
if (options.purgeData) {
LOG.info("Dropped partitions will be purged!");
- req.setEnvironmentContext(getEnvironmentContextWithIfPurgeSet());
+ context = getEnvironmentContextWithIfPurgeSet();
}
+ if (options.writeId != null) {
+ context = Optional.ofNullable(context).orElse(new EnvironmentContext());
+ context.putToProperties("writeId", options.writeId.toString());
+ }
+ if (options.txnId != null) {
+ context = Optional.ofNullable(context).orElse(new EnvironmentContext());
+ context.putToProperties("txnId", options.txnId.toString());
+ }
+ req.setEnvironmentContext(context);
+
return client.drop_partitions_req(req).getPartitions();
}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
index 3488062..da6d634 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockComponentBuilder.java
@@ -128,8 +128,8 @@ public class LockComponentBuilder {
return component;
}
- public LockComponent setLock(LockType type) {
+ public LockComponentBuilder setLock(LockType type) {
component.setType(type);
- return component;
+ return this;
}
}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionDropOptions.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionDropOptions.java
index 40018c9..db61aeb 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionDropOptions.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/PartitionDropOptions.java
@@ -28,6 +28,9 @@ public class PartitionDropOptions {
public boolean returnResults = true;
public boolean purgeData = false;
+ public Long writeId;
+ public Long txnId;
+
public static PartitionDropOptions instance() { return new PartitionDropOptions(); }
public PartitionDropOptions deleteData(boolean deleteData) {
@@ -50,5 +53,14 @@ public class PartitionDropOptions {
return this;
}
+ public PartitionDropOptions setWriteId(Long writeId) {
+ this.writeId = writeId;
+ return this;
+ }
+
+ public void setTxnId(Long txnId) {
+ this.txnId = txnId;
+ }
+
} // class PartitionDropSwitches;
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index fa5cd19..737b7b5 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -997,7 +997,8 @@ enum TxnType {
REPL_CREATED = 1,
READ_ONLY = 2,
COMPACTION = 3,
- MATER_VIEW_REBUILD = 4
+ MATER_VIEW_REBUILD = 4,
+ SOFT_DELETE = 5
}
// specifies which info to return with GetTablesExtRequest
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
index 2ecdb96..2034d85 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AcidEventListener.java
@@ -19,7 +19,11 @@
package org.apache.hadoop.hive.metastore;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.HiveObjectType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
@@ -35,6 +39,14 @@ import org.apache.hadoop.hive.metastore.events.DropTableEvent;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.hadoop.hive.metastore.HMSHandler.isMustPurge;
+import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.throwMetaException;
+
/**
* It handles cleanup of dropped partition/table/database in ACID related metastore tables
@@ -68,10 +80,45 @@ public class AcidEventListener extends TransactionalMetaStoreEventListener {
@Override
public void onDropPartition(DropPartitionEvent partitionEvent) throws MetaException {
- if (TxnUtils.isTransactionalTable(partitionEvent.getTable())) {
+ Table table = partitionEvent.getTable();
+ EnvironmentContext context = partitionEvent.getEnvironmentContext();
+
+ if (TxnUtils.isTransactionalTable(table)) {
txnHandler = getTxnHandler();
- txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, partitionEvent.getTable(),
- partitionEvent.getPartitionIterator());
+ txnHandler.cleanupRecords(HiveObjectType.PARTITION, null, table, partitionEvent.getPartitionIterator());
+
+ if (!partitionEvent.getDeleteData()) {
+ long currentTxn = Optional.ofNullable(context).map(EnvironmentContext::getProperties)
+ .map(prop -> prop.get("txnId")).map(Long::parseLong)
+ .orElse(0L);
+
+ long writeId = Optional.ofNullable(context).map(EnvironmentContext::getProperties)
+ .map(prop -> prop.get("writeId")).map(Long::parseLong)
+ .orElse(0L);
+
+ try {
+ if (currentTxn > 0) {
+ CompactionRequest rqst = new CompactionRequest(
+ table.getDbName(), table.getTableName(), CompactionType.MAJOR);
+ rqst.setRunas(TxnUtils.findUserToRunAs(table.getSd().getLocation(), table, conf));
+ rqst.putToProperties("ifPurge", Boolean.toString(isMustPurge(context, table)));
+
+ Iterator<Partition> partitionIterator = partitionEvent.getPartitionIterator();
+ while (partitionIterator.hasNext()) {
+ Partition p = partitionIterator.next();
+
+ List<FieldSchema> partCols = partitionEvent.getTable().getPartitionKeys(); // partition columns
+ List<String> partVals = p.getValues();
+ rqst.setPartitionname(Warehouse.makePartName(partCols, partVals));
+ rqst.putToProperties("location", p.getSd().getLocation());
+
+ txnHandler.submitForCleanup(rqst, writeId, currentTxn);
+ }
+ }
+ } catch ( InterruptedException | IOException e) {
+ throwMetaException(e);
+ }
+ }
}
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index fab149c..ff79412 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.AcidConstants;
import org.apache.hadoop.hive.common.AcidMetaDataFile;
+import org.apache.hadoop.hive.common.AcidMetaDataFile.DataFormat;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
@@ -104,6 +105,7 @@ import java.util.regex.Pattern;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.join;
+
import static org.apache.hadoop.hive.metastore.HiveMetaStoreClient.TRUNCATE_SKIP_DATA_DELETION;
import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.TABLE_IS_CTAS;
import static org.apache.hadoop.hive.metastore.ExceptionHandler.handleException;
@@ -3401,7 +3403,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
}
private void truncateTableInternal(String dbName, String tableName, List<String> partNames,
- String validWriteIds, long writeId, EnvironmentContext context) throws MetaException, NoSuchObjectException {
+ String validWriteIds, long writeId, EnvironmentContext context) throws MetaException, NoSuchObjectException {
boolean isSkipTrash = false, needCmRecycle = false;
try {
String[] parsedDbName = parseDbName(dbName, conf);
@@ -3423,13 +3425,12 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
// This is not transactional
for (Path location : getLocationsForTruncate(getMS(), parsedDbName[CAT_NAME], parsedDbName[DB_NAME], tableName,
tbl, partNames)) {
- FileSystem fs = location.getFileSystem(getConf());
if (!skipDataDeletion) {
- truncateDataFiles(location, fs, isSkipTrash, needCmRecycle);
+ truncateDataFiles(location, isSkipTrash, needCmRecycle);
} else {
// For Acid tables we don't need to delete the old files, only write an empty baseDir.
// Compaction and cleaner will take care of the rest
- addTruncateBaseFile(location, writeId, fs);
+ addTruncateBaseFile(location, writeId, DataFormat.TRUNCATED);
}
}
}
@@ -3448,24 +3449,35 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
* Add an empty baseDir with a truncate metadatafile
* @param location partition or table directory
* @param writeId allocated writeId
- * @param fs FileSystem
* @throws Exception
*/
- private void addTruncateBaseFile(Path location, long writeId, FileSystem fs) throws Exception {
+ private void addTruncateBaseFile(Path location, long writeId, DataFormat dataFormat)
+ throws MetaException {
+ if (location == null)
+ return;
+
Path basePath = new Path(location, AcidConstants.baseDir(writeId));
- fs.mkdirs(basePath);
- // We can not leave the folder empty, otherwise it will be skipped at some file listing in AcidUtils
- // No need for a data file, a simple metadata is enough
- AcidMetaDataFile.writeToFile(fs, basePath, AcidMetaDataFile.DataFormat.TRUNCATED);
+ try {
+ FileSystem fs = location.getFileSystem(getConf());
+ fs.mkdirs(basePath);
+ // We can not leave the folder empty, otherwise it will be skipped at some file listing in AcidUtils
+ // No need for a data file, a simple metadata is enough
+ AcidMetaDataFile.writeToFile(fs, basePath, dataFormat);
+ } catch (Exception e) {
+ throw newMetaException(e);
+ }
}
- private void truncateDataFiles(Path location, FileSystem fs, boolean isSkipTrash, boolean needCmRecycle)
- throws IOException, MetaException, NoSuchObjectException {
+ private void truncateDataFiles(Path location, boolean isSkipTrash, boolean needCmRecycle)
+ throws IOException, MetaException {
+ FileSystem fs = location.getFileSystem(getConf());
+
if (!HdfsUtils.isPathEncrypted(getConf(), fs.getUri(), location) &&
!FileUtils.pathHasSnapshotSubDir(location, fs)) {
HdfsUtils.HadoopFileStatus status = new HdfsUtils.HadoopFileStatus(getConf(), fs, location);
FileStatus targetStatus = fs.getFileStatus(location);
String targetGroup = targetStatus == null ? null : targetStatus.getGroup();
+
wh.deleteDir(location, true, isSkipTrash, needCmRecycle);
fs.mkdirs(location);
HdfsUtils.setFullFileStatus(getConf(), status, targetGroup, fs, location, false);
@@ -5029,21 +5041,21 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
}
}
- private boolean drop_partition_common(RawStore ms, String catName, String db_name,
- String tbl_name, List<String> part_vals,
- final boolean deleteData, final EnvironmentContext envContext)
- throws MetaException, NoSuchObjectException, IOException, InvalidObjectException,
- InvalidInputException {
- boolean success = false;
+ private boolean drop_partition_common(RawStore ms, String catName, String db_name, String tbl_name,
+ List<String> part_vals, boolean deleteData, final EnvironmentContext envContext)
+ throws MetaException, NoSuchObjectException, IOException, InvalidObjectException, InvalidInputException {
Path partPath = null;
- Table tbl = null;
- Partition part = null;
boolean isArchived = false;
Path archiveParentDir = null;
+ boolean success = false;
+
+ Table tbl = null;
+ Partition part = null;
boolean mustPurge = false;
- boolean tableDataShouldBeDeleted = false;
- boolean needsCm = false;
+ long writeId = 0;
+
Map<String, String> transactionalListenerResponses = Collections.emptyMap();
+ boolean needsCm = false;
if (db_name == null) {
throw new MetaException("The DB name cannot be null.");
@@ -5057,19 +5069,19 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
try {
ms.openTransaction();
+
part = ms.getPartition(catName, db_name, tbl_name, part_vals);
GetTableRequest request = new GetTableRequest(db_name,tbl_name);
request.setCatName(catName);
tbl = get_table_core(request);
- tableDataShouldBeDeleted = checkTableDataShouldBeDeleted(tbl, deleteData);
firePreEvent(new PreDropPartitionEvent(tbl, part, deleteData, this));
+
mustPurge = isMustPurge(envContext, tbl);
-
+ writeId = getWriteId(envContext);
+
if (part == null) {
- throw new NoSuchObjectException("Partition doesn't exist. "
- + part_vals);
+ throw new NoSuchObjectException("Partition doesn't exist. " + part_vals);
}
-
isArchived = MetaStoreUtils.isArchived(part);
if (isArchived) {
archiveParentDir = MetaStoreUtils.getOriginalLocation(part);
@@ -5098,28 +5110,26 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
} finally {
if (!success) {
ms.rollbackTransaction();
- } else if (deleteData && ((partPath != null) || (archiveParentDir != null))) {
- if (tableDataShouldBeDeleted) {
- if (mustPurge) {
- LOG.info("dropPartition() will purge " + partPath + " directly, skipping trash.");
- }
- else {
- LOG.info("dropPartition() will move " + partPath + " to trash-directory.");
- }
- // Archived partitions have har:/to_har_file as their location.
- // The original directory was saved in params
+ } else if (checkTableDataShouldBeDeleted(tbl, deleteData) &&
+ (partPath != null || archiveParentDir != null)) {
- if (isArchived) {
- assert (archiveParentDir != null);
- wh.deleteDir(archiveParentDir, true, mustPurge, needsCm);
- } else {
- assert (partPath != null);
- wh.deleteDir(partPath, true, mustPurge, needsCm);
- deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, needsCm);
- }
- // ok even if the data is not deleted
+ LOG.info(mustPurge ?
+ "dropPartition() will purge " + partPath + " directly, skipping trash." :
+ "dropPartition() will move " + partPath + " to trash-directory.");
+
+ // Archived partitions have har:/to_har_file as their location.
+ // The original directory was saved in params
+ if (isArchived) {
+ wh.deleteDir(archiveParentDir, true, mustPurge, needsCm);
+ } else {
+ wh.deleteDir(partPath, true, mustPurge, needsCm);
+ deleteParentRecursive(partPath.getParent(), part_vals.size() - 1, mustPurge, needsCm);
}
+ // ok even if the data is not deleted
+ } else if (TxnUtils.isTransactionalTable(tbl) && writeId > 0) {
+ addTruncateBaseFile(partPath, writeId, DataFormat.DROPPED);
}
+
if (!listeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(listeners,
EventType.DROP_PARTITION,
@@ -5131,7 +5141,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
return true;
}
- private static boolean isMustPurge(EnvironmentContext envContext, Table tbl) {
+ static boolean isMustPurge(EnvironmentContext envContext, Table tbl) {
// Data needs deletion. Check if trash may be skipped.
// Trash may be skipped iff:
// 1. deleteData == true, obviously.
@@ -5143,6 +5153,14 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
|| MetaStoreUtils.isSkipTrash(tbl.getParameters());
}
+ private long getWriteId(EnvironmentContext context){
+ return Optional.ofNullable(context)
+ .map(EnvironmentContext::getProperties)
+ .map(prop -> prop.get("writeId"))
+ .map(Long::parseLong)
+ .orElse(0L);
+ }
+
private void throwUnsupportedExceptionIfRemoteDB(String dbName, String operationName) throws MetaException {
if (isDatabaseRemote(dbName)) {
throw new MetaException("Operation " + operationName + " not supported for REMOTE database " + dbName);
@@ -5218,32 +5236,38 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
RawStore ms = getMS();
String dbName = request.getDbName(), tblName = request.getTblName();
String catName = request.isSetCatName() ? request.getCatName() : getDefaultCatalog(conf);
+
boolean ifExists = request.isSetIfExists() && request.isIfExists();
boolean deleteData = request.isSetDeleteData() && request.isDeleteData();
boolean ignoreProtection = request.isSetIgnoreProtection() && request.isIgnoreProtection();
boolean needResult = !request.isSetNeedResult() || request.isNeedResult();
+
List<PathAndDepth> dirsToDelete = new ArrayList<>();
List<Path> archToDelete = new ArrayList<>();
- EnvironmentContext envContext = request.isSetEnvironmentContext()
- ? request.getEnvironmentContext() : null;
-
+ EnvironmentContext envContext =
+ request.isSetEnvironmentContext() ? request.getEnvironmentContext() : null;
boolean success = false;
- ms.openTransaction();
+
Table tbl = null;
List<Partition> parts = null;
boolean mustPurge = false;
+ long writeId = 0;
+
Map<String, String> transactionalListenerResponses = null;
- boolean needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, dbName),
- ms.getTable(catName, dbName, tblName));
-
+ boolean needsCm = false;
+
try {
+ ms.openTransaction();
// We need Partition-s for firing events and for result; DN needs MPartition-s to drop.
// Great... Maybe we could bypass fetching MPartitions by issuing direct SQL deletes.
tbl = get_table_core(catName, dbName, tblName);
mustPurge = isMustPurge(envContext, tbl);
+ writeId = getWriteId(envContext);
+
int minCount = 0;
RequestPartsSpec spec = request.getParts();
List<String> partNames = null;
+
if (spec.isSetExprs()) {
// Dropping by expressions.
parts = new ArrayList<>(spec.getExprs().size());
@@ -5291,7 +5315,6 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
}
for (Partition part : parts) {
-
// TODO - we need to speed this up for the normal path where all partitions are under
// the table and we don't have to stat every partition
@@ -5314,24 +5337,24 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
}
ms.dropPartitions(catName, dbName, tblName, partNames);
- if (parts != null && !parts.isEmpty() && !transactionalListeners.isEmpty()) {
+ if (!parts.isEmpty() && !transactionalListeners.isEmpty()) {
transactionalListenerResponses = MetaStoreListenerNotifier
.notifyEvent(transactionalListeners, EventType.DROP_PARTITION,
new DropPartitionEvent(tbl, parts, true, deleteData, this), envContext);
}
-
success = ms.commitTransaction();
+ needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName, dbName), tbl);
+
DropPartitionsResult result = new DropPartitionsResult();
if (needResult) {
result.setPartitions(parts);
}
-
return result;
} finally {
if (!success) {
ms.rollbackTransaction();
} else if (checkTableDataShouldBeDeleted(tbl, deleteData)) {
- LOG.info( mustPurge?
+ LOG.info(mustPurge ?
"dropPartition() will purge partition-directories directly, skipping trash."
: "dropPartition() will move partition-directories to trash-directory.");
// Archived partitions have har:/to_har_file as their location.
@@ -5369,10 +5392,19 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
throw new MetaException("Failed to delete parent: " + ex.getMessage());
}
}
+ } else if (TxnUtils.isTransactionalTable(tbl) && writeId > 0) {
+ for (Partition part : parts) {
+ if ((part.getSd() != null) && (part.getSd().getLocation() != null)) {
+ Path partPath = new Path(part.getSd().getLocation());
+ verifyIsWritablePath(partPath);
+
+ addTruncateBaseFile(partPath, writeId, DataFormat.DROPPED);
+ }
+ }
}
if (parts != null) {
- if (parts != null && !parts.isEmpty() && !listeners.isEmpty()) {
+ if (!parts.isEmpty() && !listeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(listeners,
EventType.DROP_PARTITION,
new DropPartitionEvent(tbl, parts, success, deleteData, this),
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 9cece8a..3a159f7 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -366,6 +366,7 @@ class CompactionTxnHandler extends TxnHandler {
info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
info.runAs = rs.getString(6);
info.highestWriteId = rs.getLong(7);
+ info.properties = rs.getString(8);
if (LOG.isDebugEnabled()) {
LOG.debug("Found ready to clean: " + info.toString());
}
@@ -1503,7 +1504,7 @@ class CompactionTxnHandler extends TxnHandler {
protected void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId,
long tempId) throws SQLException, MetaException {
super.updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnType, commitId, tempId);
- if (txnType == TxnType.COMPACTION) {
+ if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) {
stmt.executeUpdate(
"UPDATE \"COMPACTION_QUEUE\" SET \"CQ_NEXT_TXN_ID\" = " + commitId + ", \"CQ_COMMIT_TIME\" = " +
getEpochFn(dbProduct) + " WHERE \"CQ_TXN_ID\" = " + txnid);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 887da84..2a9d6bb 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -55,6 +55,7 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.sql.DataSource;
@@ -167,6 +168,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.base.Splitter;
+import static org.apache.commons.lang3.StringUtils.repeat;
import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
import static org.apache.hadoop.hive.metastore.txn.TxnUtils.executeQueriesInBatchNoCount;
import static org.apache.hadoop.hive.metastore.txn.TxnUtils.executeQueriesInBatch;
@@ -3814,6 +3816,74 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
+ @Override
+ @RetrySemantics.Idempotent
+ public boolean submitForCleanup(CompactionRequest rqst, long highestWriteId, long txnId) throws MetaException {
+ // Put a compaction request in the queue.
+ try {
+ Connection dbConn = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ lockInternal();
+
+ List<String> params = new ArrayList<String>() {{
+ add(rqst.getDbname());
+ add(rqst.getTablename());
+ }};
+ long cqId;
+ try (Statement stmt = dbConn.createStatement()) {
+ cqId = generateCompactionQueueId(stmt);
+ }
+ StringBuilder buf = new StringBuilder(
+ "INSERT INTO \"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TXN_ID\", \"CQ_ENQUEUE_TIME\", \"CQ_DATABASE\", \"CQ_TABLE\", ");
+ String partName = rqst.getPartitionname();
+ if (partName != null) {
+ buf.append("\"CQ_PARTITION\", ");
+ params.add(partName);
+ }
+ buf.append("\"CQ_STATE\", \"CQ_TYPE\"");
+ params.add(String.valueOf(READY_FOR_CLEANING));
+ params.add(thriftCompactionType2DbType(rqst.getType()).toString());
+
+ if (rqst.getProperties() != null) {
+ buf.append(", \"CQ_TBLPROPERTIES\"");
+ params.add(new StringableMap(rqst.getProperties()).toString());
+ }
+ if (rqst.getRunas() != null) {
+ buf.append(", \"CQ_RUN_AS\"");
+ params.add(rqst.getRunas());
+ }
+ buf.append(") values (")
+ .append(
+ Stream.of(cqId, highestWriteId, txnId, getEpochFn(dbProduct))
+ .map(Object::toString)
+ .collect(Collectors.joining(", ")))
+ .append(repeat(", ?", params.size()))
+ .append(")");
+
+ String s = buf.toString();
+ try (PreparedStatement pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params)) {
+ LOG.debug("Going to execute update <" + s + ">");
+ pst.executeUpdate();
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ return true;
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback: ", e);
+ rollbackDBConn(dbConn);
+ checkRetryable(e, "submitForCleanup(" + rqst + ")");
+ throw new MetaException("Failed to submit cleanup request: " +
+ StringUtils.stringifyException(e));
+ } finally {
+ closeDbConn(dbConn);
+ unlockInternal();
+ }
+ } catch (RetryException e) {
+ return submitForCleanup(rqst, highestWriteId, txnId);
+ }
+ }
+
private static String compactorStateToResponse(char s) {
switch (s) {
case INITIATED_STATE: return INITIATED_RESPONSE;
@@ -4274,6 +4344,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
partVals = p.getValues();
partName = Warehouse.makePartName(partCols, partVals);
+ buff.setLength(0);
buff.append("DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_DATABASE\"='");
buff.append(dbName);
buff.append("' AND \"TC_TABLE\"='");
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 58e0999..5837727 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -346,6 +346,9 @@ public interface TxnStore extends Configurable {
@RetrySemantics.Idempotent
CompactionResponse compact(CompactionRequest rqst) throws MetaException;
+ @RetrySemantics.Idempotent
+ boolean submitForCleanup(CompactionRequest rqst, long highestWriteId, long txnId) throws MetaException;
+
/**
* Show list of current compactions.
* @param rqst info on which compactions to show
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 13d45d1..eca6caa 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -18,6 +18,9 @@
package org.apache.hadoop.hive.metastore.txn;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -31,9 +34,13 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
@@ -495,4 +502,64 @@ public class TxnUtils {
DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct, conf);
stmt.execute(databaseProduct.getTxnSeedFn(seedTxnId));
}
+
+ /**
+ * Determine which user to run an operation as. If metastore.compactor.run.as.user is set, that user will be
+ * returned; if not: the the owner of the directory to be compacted.
+ * It is asserted that either the user running the hive metastore or the table
+ * owner must be able to stat the directory and determine the owner.
+ * @param location directory that will be read or written to.
+ * @param t metastore table object
+ * @return metastore.compactor.run.as.user value; or if that is not set: username of the owner of the location.
+ * @throws java.io.IOException if neither the hive metastore user nor the table owner can stat
+ * the location.
+ */
+ public static String findUserToRunAs(String location, Table t, Configuration conf)
+ throws IOException, InterruptedException {
+ LOG.debug("Determining who to run the job as.");
+
+ // check if a specific user is set in config
+ String runUserAs = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.COMPACTOR_RUN_AS_USER);
+ if (runUserAs != null && !"".equals(runUserAs)) {
+ return runUserAs;
+ }
+ // get table directory owner
+ Path p = new Path(location);
+ FileSystem fs = p.getFileSystem(conf);
+
+ try {
+ FileStatus stat = fs.getFileStatus(p);
+ LOG.debug("Running job as " + stat.getOwner());
+ return stat.getOwner();
+ } catch (AccessControlException e) {
+ // TODO not sure this is the right exception
+ LOG.debug("Unable to stat file as current user, trying as table owner");
+
+ // Now, try it as the table owner and see if we get better luck.
+ List<String> wrapper = new ArrayList<>(1);
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(t.getOwner(),
+ UserGroupInformation.getLoginUser());
+
+ ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+ // need to use a new filesystem object here to have the correct ugi
+ FileSystem proxyFs = p.getFileSystem(conf);
+ FileStatus stat = proxyFs.getFileStatus(p);
+ wrapper.add(stat.getOwner());
+ return null;
+ });
+
+ try {
+ FileSystem.closeAllForUGI(ugi);
+ } catch (IOException exception) {
+ LOG.error("Could not clean up file-system handles for UGI: " + ugi, exception);
+ }
+ if (wrapper.size() == 1) {
+ LOG.debug("Running job as " + wrapper.get(0));
+ return wrapper.get(0);
+ }
+ }
+ LOG.error("Unable to stat file " + p + " as either current user(" +
+ UserGroupInformation.getLoginUser() + ") or table owner(" + t.getOwner() + "), giving up");
+ throw new IOException("Unable to stat file: " + p);
+ }
}
diff --git a/storage-api/src/java/org/apache/hadoop/hive/common/AcidMetaDataFile.java b/storage-api/src/java/org/apache/hadoop/hive/common/AcidMetaDataFile.java
index e4a9abd..e833060 100644
--- a/storage-api/src/java/org/apache/hadoop/hive/common/AcidMetaDataFile.java
+++ b/storage-api/src/java/org/apache/hadoop/hive/common/AcidMetaDataFile.java
@@ -55,7 +55,9 @@ public class AcidMetaDataFile {
// written by Major compaction
COMPACTED,
// written by truncate
- TRUNCATED;
+ TRUNCATED,
+ // written by drop partition
+ DROPPED;
@Override
public String toString() {