You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/11/07 08:56:59 UTC
[hive] branch master updated: HIVE-22401: ACID: Refactor
CompactorMR (Laszlo Pinter reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 358e5a9 HIVE-22401: ACID: Refactor CompactorMR (Laszlo Pinter reviewed by Peter Vary)
358e5a9 is described below
commit 358e5a93f3c1e33662bf35d75343b31f4d1246bf
Author: Laszlo Pinter <lp...@cloudera.com>
AuthorDate: Thu Nov 7 09:46:55 2019 +0100
HIVE-22401: ACID: Refactor CompactorMR (Laszlo Pinter reviewed by Peter Vary)
---
.../hadoop/hive/ql/txn/compactor/CompactorMR.java | 451 +--------------------
.../hive/ql/txn/compactor/MajorQueryCompactor.java | 225 ++++++++++
.../ql/txn/compactor/MmMajorQueryCompactor.java | 324 +++++++++++++++
.../hive/ql/txn/compactor/QueryCompactor.java | 106 +++++
.../ql/txn/compactor/QueryCompactorFactory.java | 58 +++
5 files changed, 720 insertions(+), 444 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 0f1579a..ee2c0f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -17,31 +17,24 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.HashSet;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
-import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.StringableMap;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidReadTxnList;
@@ -50,19 +43,13 @@ import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.DriverUtils;
-import org.apache.hadoop.hive.ql.ddl.table.creation.ShowCreateTableOperation;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.io.AcidInputFormat;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
@@ -71,10 +58,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.RecordIdentifier;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.util.DirectionUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -95,9 +78,7 @@ import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.HiveStringUtils;
import org.apache.hive.common.util.Ref;
import org.apache.parquet.Strings;
import org.apache.thrift.TException;
@@ -238,26 +219,16 @@ public class CompactorMR {
}
/**
- * Run major compaction in a HiveQL query (compaction for MM tables handled in runMmCompaction method).
- * TODO:
+ * Run major compaction in a HiveQL query (compaction for MM tables handled in {@link MmMajorQueryCompactor}
+ * class).
+ * Find a better way:
* 1. A good way to run minor compaction (currently disabled when this config is enabled)
* 2. More generic approach to collecting files in the same logical bucket to compact within the same task
* (currently we're using Tez split grouping).
*/
- if (!AcidUtils.isInsertOnlyTable(t.getParameters()) && HiveConf.getBoolVar(conf,
- ConfVars.COMPACTOR_CRUD_QUERY_BASED)) {
- if (ci.isMajorCompaction()) {
- runCrudCompaction(conf, t, p, sd, writeIds, ci);
- return;
- } else {
- throw new RuntimeException("Query based compaction is not currently supported for minor compactions");
- }
- }
-
- if (AcidUtils.isInsertOnlyTable(t.getParameters())) {
- if (HiveConf.getBoolVar(conf, ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
- runMmCompaction(conf, t, p, sd, writeIds, ci);
- }
+ QueryCompactor queryCompactor = QueryCompactorFactory.getQueryCompactor(t, conf, ci);
+ if (queryCompactor != null) {
+ queryCompactor.runCompaction(conf, t, p, sd, writeIds, ci);
return;
}
@@ -334,82 +305,6 @@ public class CompactorMR {
su.gatherStats();
}
- /**
- * @param sd (this is the resolved StorageDescriptor, i.e. resolved to table or partition)
- * @param writeIds (valid write ids used to filter rows while they're being read for compaction)
- * @throws IOException
- */
- private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds,
- CompactionInfo ci) throws IOException {
- AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters()));
- AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), hiveConf, writeIds, Ref.from(
- false), false, t.getParameters(), false);
-
- if (!isEnoughToCompact(dir, sd)) {
- return;
- }
-
- String user = UserGroupInformation.getCurrentUser().getShortUserName();
- SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, true);
- // Set up the session for driver.
- HiveConf conf = new HiveConf(hiveConf);
- conf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
- /**
- * For now, we will group splits on tez so that we end up with all bucket files,
- * with same bucket number in one map task.
- */
- conf.set(ConfVars.SPLIT_GROUPING_MODE.varname, "compactor");
- String tmpPrefix = t.getDbName() + "_tmp_compactor_" + t.getTableName() + "_";
- String tmpTableName = tmpPrefix + System.currentTimeMillis();
- long compactorTxnId = CompactorMap.getCompactorTxnId(conf);
- try {
- // Create a temporary table under the temp location --> db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234
- String query = buildCrudMajorCompactionCreateTableQuery(tmpTableName, t, sd);
- LOG.info("Running major compaction query into temp table with create definition: {}", query);
- try {
- DriverUtils.runOnDriver(conf, user, sessionState, query);
- } catch (Exception ex) {
- Throwable cause = ex;
- while (cause != null && !(cause instanceof AlreadyExistsException)) {
- cause = cause.getCause();
- }
- if (cause == null) {
- throw new IOException(ex);
- }
- }
- query = buildCrudMajorCompactionQuery(conf, t, p, tmpTableName);
- LOG.info("Running major compaction via query: {}", query);
- /**
- * This will create bucket files like:
- * db/db_tmp_compactor_tbl_1234/00000_0
- * db/db_tmp_compactor_tbl_1234/00001_0
- */
- DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId);
- /**
- * This achieves a final layout like (wid is the highest valid write id for this major compaction):
- * db/tbl/ptn/base_wid/bucket_00000
- * db/tbl/ptn/base_wid/bucket_00001
- */
- org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName);
- String tmpLocation = tempTable.getSd().getLocation();
- commitCrudMajorCompaction(t, tmpLocation, tmpTableName, sd.getLocation(), conf, writeIds, compactorTxnId);
- } catch (HiveException e) {
- LOG.error("Error doing query based major compaction", e);
- throw new IOException(e);
- } finally {
- try {
- DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName);
- } catch (HiveException e) {
- LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName);
- LOG.error(ExceptionUtils.getStackTrace(e));
- }
- }
- }
-
- private static boolean isEnoughToCompact(AcidUtils.Directory dir, StorageDescriptor sd) {
- return isEnoughToCompact(true, dir, sd);
- }
-
private static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) {
int deltaCount = dir.getCurrentDirectories().size();
int origCount = dir.getOriginalFiles().size();
@@ -442,308 +337,9 @@ public class CompactorMR {
return isEnoughToCompact;
}
- private void runMmCompaction(HiveConf conf, Table t, Partition p,
- StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException {
- LOG.debug("Going to delete directories for aborted transactions for MM table "
- + t.getDbName() + "." + t.getTableName());
- AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false),
- false, t.getParameters(), false);
- removeFilesForMmTable(conf, dir);
-
- // Then, actually do the compaction.
- if (!ci.isMajorCompaction()) {
- // Not supported for MM tables right now.
- LOG.info("Not compacting " + sd.getLocation() + "; not a major compaction");
- return;
- }
-
- if (!isEnoughToCompact(dir, sd)) {
- return;
- }
-
- try {
- String tmpLocation = generateTmpPath(sd);
- Path baseLocation = new Path(tmpLocation, "_base");
-
- // Set up the session for driver.
- HiveConf driverConf = new HiveConf(conf);
- driverConf.set(ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
- driverConf.unset(ValidTxnList.VALID_TXNS_KEY); //so Driver doesn't get confused
- //thinking it already has a txn opened
-
- String user = UserGroupInformation.getCurrentUser().getShortUserName();
- SessionState sessionState = DriverUtils.setUpSessionState(driverConf, user, true);
-
- // Note: we could skip creating the table and just add table type stuff directly to the
- // "insert overwrite directory" command if there were no bucketing or list bucketing.
- String tmpPrefix = t.getDbName() + ".tmp_compactor_" + t.getTableName() + "_";
- String tmpTableName = null;
- while (true) {
- tmpTableName = tmpPrefix + System.currentTimeMillis();
- String query = buildMmCompactionCtQuery(tmpTableName, t,
- p == null ? t.getSd() : p.getSd(), baseLocation.toString());
- LOG.info("Compacting a MM table into " + query);
- try {
- DriverUtils.runOnDriver(driverConf, user, sessionState, query);
- break;
- } catch (Exception ex) {
- Throwable cause = ex;
- while (cause != null && !(cause instanceof AlreadyExistsException)) {
- cause = cause.getCause();
- }
- if (cause == null) {
- throw new IOException(ex);
- }
- }
- }
- String query = buildMmCompactionQuery(conf, t, p, tmpTableName);
- LOG.info("Compacting a MM table via " + query);
- long compactorTxnId = CompactorMap.getCompactorTxnId(conf);
- DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId);
- commitMmCompaction(tmpLocation, sd.getLocation(), conf, writeIds, compactorTxnId);
- DriverUtils.runOnDriver(driverConf, user, sessionState,
- "drop table if exists " + tmpTableName);
- } catch (HiveException e) {
- LOG.error("Error compacting a MM table", e);
- throw new IOException(e);
- }
- }
-
private String generateTmpPath(StorageDescriptor sd) {
return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
}
-
- /**
- * Note on ordering of rows in the temp table:
- * We need each final bucket file soreted by original write id (ascending), bucket (ascending) and row id (ascending).
- * (current write id will be the same as original write id).
- * We will be achieving the ordering via a custom split grouper for compactor.
- * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the config description.
- * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)}
- * for details on the mechanism.
- */
- private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t, StorageDescriptor sd) {
- StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" (");
- // Acid virtual columns
- query.append(
- "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, `row` struct<");
- List<FieldSchema> cols = t.getSd().getCols();
- boolean isFirst = true;
- // Actual columns
- for (FieldSchema col : cols) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append("`").append(col.getName()).append("` ").append(":").append(col.getType());
- }
- query.append(">)");
- query.append(" stored as orc");
- query.append(" tblproperties ('transactional'='false')");
- return query.toString();
- }
-
- private String buildCrudMajorCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) {
- String fullName = t.getDbName() + "." + t.getTableName();
- String query = "insert into table " + tmpName + " ";
- String filter = "";
- if (p != null) {
- filter = filter + " where ";
- List<String> vals = p.getValues();
- List<FieldSchema> keys = t.getPartitionKeys();
- assert keys.size() == vals.size();
- for (int i = 0; i < keys.size(); ++i) {
- filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'");
- }
- }
- query += " select validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), ROW__ID.writeId, "
- + "ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT(";
- List<FieldSchema> cols = t.getSd().getCols();
- for (int i = 0; i < cols.size(); ++i) {
- query += (i == 0 ? "'" : ", '") + cols.get(i).getName() + "', " + cols.get(i).getName();
- }
- query += ") from " + fullName + filter;
- return query;
- }
-
- /**
- * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn.
- * Since the temp table is a non-transactional table, it has file names in the "original" format.
- * Also, due to split grouping in
- * {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)},
- * we will end up with one file per bucket.
- */
- private void commitCrudMajorCompaction(Table t, String from, String tmpTableName, String to, HiveConf conf,
- ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
- Path fromPath = new Path(from);
- Path toPath = new Path(to);
- Path tmpTablePath = new Path(fromPath, tmpTableName);
- FileSystem fs = fromPath.getFileSystem(conf);
- // Assume the high watermark can be used as maximum transaction ID.
- long maxTxn = actualWriteIds.getHighWatermark();
- // Get a base_wid path which will be the new compacted base
- AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false)
- .maximumWriteId(maxTxn).bucket(0).statementId(-1);
- Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
- if (!fs.exists(fromPath)) {
- LOG.info("{} not found. Assuming 0 splits. Creating {}", from, newBaseDir);
- fs.mkdirs(newBaseDir);
- return;
- }
- LOG.info("Moving contents of {} to {}", tmpTablePath, to);
- /**
- * Currently mapping file with name 0000_0 to bucket_00000, 0000_1 to bucket_00001 and so on
- * TODO/ToThink:
- * Q. Can file with name 0000_0 under temp table be deterministically renamed to bucket_00000 in the destination?
- */
- // List<String> buckCols = t.getSd().getBucketCols();
- FileStatus[] children = fs.listStatus(fromPath);
- for (FileStatus filestatus : children) {
- String originalFileName = filestatus.getPath().getName();
- // This if() may not be required I think...
- if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) {
- int bucketId = AcidUtils.parseBucketId(filestatus.getPath());
- options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn)
- .bucket(bucketId).statementId(-1).visibilityTxnId(compactorTxnId);
- Path finalBucketFile = AcidUtils.createFilename(toPath, options);
- Hive.moveFile(conf, filestatus.getPath(), finalBucketFile, true, false, false);
- }
- }
- fs.delete(fromPath, true);
- }
-
- private String buildMmCompactionCtQuery(
- String fullName, Table t, StorageDescriptor sd, String location) {
- StringBuilder query = new StringBuilder("create temporary table ")
- .append(fullName).append("(");
- List<FieldSchema> cols = t.getSd().getCols();
- boolean isFirst = true;
- for (FieldSchema col : cols) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append("`").append(col.getName()).append("` ").append(col.getType());
- }
- query.append(") ");
-
- // Bucketing.
- List<String> buckCols = t.getSd().getBucketCols();
- if (buckCols.size() > 0) {
- query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
- List<Order> sortCols = t.getSd().getSortCols();
- if (sortCols.size() > 0) {
- query.append("SORTED BY (");
- isFirst = true;
- for (Order sortCol : sortCols) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder()));
- }
- query.append(") ");
- }
- query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS");
- }
-
- // Stored as directories. We don't care about the skew otherwise.
- if (t.getSd().isStoredAsSubDirectories()) {
- SkewedInfo skewedInfo = t.getSd().getSkewedInfo();
- if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
- query.append(" SKEWED BY (").append(
- StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
- isFirst = true;
- for (List<String> colValues : skewedInfo.getSkewedColValues()) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append("('").append(StringUtils.join("','", colValues)).append("')");
- }
- query.append(") STORED AS DIRECTORIES");
- }
- }
-
- SerDeInfo serdeInfo = sd.getSerdeInfo();
- Map<String, String> serdeParams = serdeInfo.getParameters();
- query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(
- serdeInfo.getSerializationLib())).append("'");
- String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
- assert sh == null; // Not supposed to be a compactable table.
- if (!serdeParams.isEmpty()) {
- ShowCreateTableOperation.appendSerdeParams(query, serdeParams);
- }
- query.append("STORED AS INPUTFORMAT '").append(
- HiveStringUtils.escapeHiveCommand(sd.getInputFormat())).append("' OUTPUTFORMAT '").append(
- HiveStringUtils.escapeHiveCommand(sd.getOutputFormat())).append("' LOCATION '").append(
- HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
- // Exclude all standard table properties.
- Set<String> excludes = getHiveMetastoreConstants();
- excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
- isFirst = true;
- for (Map.Entry<String, String> e : t.getParameters().entrySet()) {
- if (e.getValue() == null) continue;
- if (excludes.contains(e.getKey())) continue;
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append("'").append(e.getKey()).append("'='").append(
- HiveStringUtils.escapeHiveCommand(e.getValue())).append("'");
- }
- if (!isFirst) {
- query.append(", ");
- }
- query.append("'transactional'='false')");
- return query.toString();
-
- }
-
- private static Set<String> getHiveMetastoreConstants() {
- HashSet<String> result = new HashSet<>();
- for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
- if (!Modifier.isStatic(f.getModifiers())) continue;
- if (!Modifier.isFinal(f.getModifiers())) continue;
- if (!String.class.equals(f.getType())) continue;
- f.setAccessible(true);
- try {
- result.add((String)f.get(null));
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
- return result;
- }
-
- private String buildMmCompactionQuery(HiveConf conf, Table t, Partition p, String tmpName) {
- String fullName = t.getDbName() + "." + t.getTableName();
- // TODO: ideally we should make a special form of insert overwrite so that we:
- // 1) Could use fast merge path for ORC and RC.
- // 2) Didn't have to create a table.
-
- String query = "insert overwrite table " + tmpName + " ";
- String filter = "";
- if (p != null) {
- filter = " where ";
- List<String> vals = p.getValues();
- List<FieldSchema> keys = t.getPartitionKeys();
- assert keys.size() == vals.size();
- for (int i = 0; i < keys.size(); ++i) {
- filter += (i == 0 ? "`" : " and `") + (keys.get(i).getName() + "`='" + vals.get(i) + "'");
- }
- query += " select ";
- // Use table descriptor for columns.
- List<FieldSchema> cols = t.getSd().getCols();
- for (int i = 0; i < cols.size(); ++i) {
- query += (i == 0 ? "`" : ", `") + (cols.get(i).getName() + "`");
- }
- } else {
- query += "select *";
- }
- query += " from " + fullName + filter;
- return query;
- }
/**
* @param baseDir if not null, it's either table/partition root folder or base_xxxx.
@@ -1221,7 +817,7 @@ public class CompactorMR {
deleteEventWriter.close(false);
}
}
- private static long getCompactorTxnId(Configuration jobConf) {
+ static long getCompactorTxnId(Configuration jobConf) {
String snapshot = jobConf.get(ValidTxnList.VALID_TXNS_KEY);
if(Strings.isNullOrEmpty(snapshot)) {
throw new IllegalStateException(ValidTxnList.VALID_TXNS_KEY + " not found for writing to "
@@ -1423,37 +1019,4 @@ public class CompactorMR {
fs.delete(tmpLocation, true);
}
}
-
- /**
- * Note: similar logic to the main committer; however, no ORC versions and stuff like that.
- * @param from The temp directory used for compactor output. Not the actual base/delta.
- * @param to The final directory; basically a SD directory. Not the actual base/delta.
- * @param compactorTxnId txn that the compactor started
- */
- private void commitMmCompaction(String from, String to, Configuration conf,
- ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException {
- Path fromPath = new Path(from), toPath = new Path(to);
- FileSystem fs = fromPath.getFileSystem(conf);
- // Assume the high watermark can be used as maximum transaction ID.
- //todo: is that true? can it be aborted? does it matter for compaction? probably OK since
- //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs.
- long maxTxn = actualWriteIds.getHighWatermark();
- AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
- .writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0).statementId(-1)
- .visibilityTxnId(compactorTxnId);
- Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
- if (!fs.exists(fromPath)) {
- LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir);
- fs.mkdirs(newBaseDir);
- return;
- }
- LOG.info("Moving contents of " + from + " to " + to);
- FileStatus[] children = fs.listStatus(fromPath);
- if (children.length != 1) {
- throw new IOException("Unexpected files in the source: " + Arrays.toString(children));
- }
- FileStatus dirPath = children[0];
- fs.rename(dirPath.getPath(), newBaseDir);
- fs.delete(fromPath, true);
- }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
new file mode 100644
index 0000000..10681c0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Class responsible of running query based major compaction.
+ */
+class MajorQueryCompactor extends QueryCompactor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MajorQueryCompactor.class.getName());
+
+ @Override
+ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
+ ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException {
+ AcidUtils
+ .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters()));
+ AcidUtils.Directory dir = AcidUtils
+ .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
+ table.getParameters(), false);
+
+ if (!Util.isEnoughToCompact(true, dir, storageDescriptor)) {
+ return;
+ }
+
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, true);
+ // Set up the session for driver.
+ HiveConf conf = new HiveConf(hiveConf);
+ conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+ /*
+ * For now, we will group splits on tez so that we end up with all bucket files,
+ * with same bucket number in one map task.
+ */
+ conf.set(HiveConf.ConfVars.SPLIT_GROUPING_MODE.varname, "compactor");
+ String tmpPrefix = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_";
+ String tmpTableName = tmpPrefix + System.currentTimeMillis();
+ long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf);
+ try {
+ // Create a temporary table under the temp location --> db/tbl/ptn/_tmp_1234/db.tmp_compactor_tbl_1234
+ String query = buildCrudMajorCompactionCreateTableQuery(tmpTableName, table);
+ LOG.info("Running major compaction query into temp table with create definition: {}", query);
+ try {
+ DriverUtils.runOnDriver(conf, user, sessionState, query);
+ } catch (Exception ex) {
+ Throwable cause = ex;
+ while (cause != null && !(cause instanceof AlreadyExistsException)) {
+ cause = cause.getCause();
+ }
+ if (cause == null) {
+ throw new IOException(ex);
+ }
+ }
+ query = buildCrudMajorCompactionQuery(table, partition, tmpTableName);
+ LOG.info("Running major compaction via query: {}", query);
+ /*
+ * This will create bucket files like:
+ * db/db_tmp_compactor_tbl_1234/00000_0
+ * db/db_tmp_compactor_tbl_1234/00001_0
+ */
+ DriverUtils.runOnDriver(conf, user, sessionState, query, writeIds, compactorTxnId);
+ /*
+ * This achieves a final layout like (wid is the highest valid write id for this major compaction):
+ * db/tbl/ptn/base_wid/bucket_00000
+ * db/tbl/ptn/base_wid/bucket_00001
+ */
+ org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName);
+ String tmpLocation = tempTable.getSd().getLocation();
+ commitCrudMajorCompaction(tmpLocation, tmpTableName, storageDescriptor.getLocation(), conf, writeIds,
+ compactorTxnId);
+ } catch (HiveException e) {
+ LOG.error("Error doing query based major compaction", e);
+ throw new IOException(e);
+ } finally {
+ try {
+ DriverUtils.runOnDriver(conf, user, sessionState, "drop table if exists " + tmpTableName);
+ } catch (HiveException e) {
+ LOG.error("Unable to delete drop temp table {} which was created for running major compaction", tmpTableName);
+ LOG.error(ExceptionUtils.getStackTrace(e));
+ }
+ }
+ }
+
+ /**
+ * Note on ordering of rows in the temp table:
+ * We need each final bucket file soreted by original write id (ascending), bucket (ascending) and row id (ascending).
+ * (current write id will be the same as original write id).
+ * We will be achieving the ordering via a custom split grouper for compactor.
+ * See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the config description.
+ * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)}
+ * for details on the mechanism.
+ */
+ private String buildCrudMajorCompactionCreateTableQuery(String fullName, Table t) {
+ StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" (");
+ // Acid virtual columns
+ query.append(
+ "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, "
+ + "`row` struct<");
+ List<FieldSchema> cols = t.getSd().getCols();
+ boolean isFirst = true;
+ // Actual columns
+ for (FieldSchema col : cols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("`").append(col.getName()).append("` ").append(":").append(col.getType());
+ }
+ query.append(">)");
+ query.append(" stored as orc");
+ query.append(" tblproperties ('transactional'='false')");
+ return query.toString();
+ }
+
+ private String buildCrudMajorCompactionQuery(Table t, Partition p, String tmpName) {
+ String fullName = t.getDbName() + "." + t.getTableName();
+ StringBuilder query = new StringBuilder("insert into table " + tmpName + " ");
+ StringBuilder filter = new StringBuilder();
+ if (p != null) {
+ filter.append(" where ");
+ List<String> vals = p.getValues();
+ List<FieldSchema> keys = t.getPartitionKeys();
+ assert keys.size() == vals.size();
+ for (int i = 0; i < keys.size(); ++i) {
+ filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i))
+ .append("'");
+ }
+ }
+ query.append(" select validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), ROW__ID.writeId, "
+ + "ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT(");
+ List<FieldSchema> cols = t.getSd().getCols();
+ for (int i = 0; i < cols.size(); ++i) {
+ query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', ").append(cols.get(i).getName());
+ }
+ query.append(") from ").append(fullName).append(filter);
+ return query.toString();
+ }
+
+ /**
+ * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn.
+ * Since the temp table is a non-transactional table, it has file names in the "original" format.
+ * Also, due to split grouping in
+ * {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)},
+ * we will end up with one file per bucket.
+ */
+ private void commitCrudMajorCompaction(String from, String tmpTableName, String to, HiveConf conf,
+ ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
+ Path fromPath = new Path(from);
+ Path toPath = new Path(to);
+ Path tmpTablePath = new Path(fromPath, tmpTableName);
+ FileSystem fs = fromPath.getFileSystem(conf);
+ // Assume the high watermark can be used as maximum transaction ID.
+ long maxTxn = actualWriteIds.getHighWatermark();
+ // Get a base_wid path which will be the new compacted base
+ AcidOutputFormat.Options options =
+ new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0)
+ .statementId(-1);
+ Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
+ if (!fs.exists(fromPath)) {
+ LOG.info("{} not found. Assuming 0 splits. Creating {}", from, newBaseDir);
+ fs.mkdirs(newBaseDir);
+ return;
+ }
+ LOG.info("Moving contents of {} to {}", tmpTablePath, to);
+ /*
+ * Currently mapping file with name 0000_0 to bucket_00000, 0000_1 to bucket_00001 and so on
+ * TODO/ToThink:
+ * Q. Can file with name 0000_0 under temp table be deterministically renamed to bucket_00000 in the destination?
+ */
+ // List<String> buckCols = t.getSd().getBucketCols();
+ FileStatus[] children = fs.listStatus(fromPath);
+ for (FileStatus filestatus : children) {
+ String originalFileName = filestatus.getPath().getName();
+ // This if() may not be required I think...
+ if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) {
+ int bucketId = AcidUtils.parseBucketId(filestatus.getPath());
+ options = new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn)
+ .bucket(bucketId).statementId(-1).visibilityTxnId(compactorTxnId);
+ Path finalBucketFile = AcidUtils.createFilename(toPath, options);
+ Hive.moveFile(conf, filestatus.getPath(), finalBucketFile, true, false, false);
+ }
+ }
+ fs.delete(fromPath, true);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
new file mode 100644
index 0000000..f7e0a85
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.ddl.table.creation.ShowCreateTableOperation;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Class responsible to run query based major compaction on insert only tables.
+ */
+class MmMajorQueryCompactor extends QueryCompactor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MmMajorQueryCompactor.class.getName());
+
+ @Override
+ void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
+ ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException {
+ LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table
+ .getTableName());
+ AcidUtils.Directory dir = AcidUtils
+ .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
+ table.getParameters(), false);
+ removeFilesForMmTable(hiveConf, dir);
+
+ // Then, actually do the compaction.
+ if (!compactionInfo.isMajorCompaction()) {
+ // Not supported for MM tables right now.
+ LOG.info("Not compacting " + storageDescriptor.getLocation() + "; not a major compaction");
+ return;
+ }
+
+ if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) {
+ return;
+ }
+
+ try {
+ String tmpLocation = Util.generateTmpPath(storageDescriptor);
+ Path baseLocation = new Path(tmpLocation, "_base");
+
+ // Set up the session for driver.
+ HiveConf driverConf = new HiveConf(hiveConf);
+ driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
+ driverConf.unset(ValidTxnList.VALID_TXNS_KEY); //so Driver doesn't get confused
+ //thinking it already has a txn opened
+
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ SessionState sessionState = DriverUtils.setUpSessionState(driverConf, user, true);
+
+ // Note: we could skip creating the table and just add table type stuff directly to the
+ // "insert overwrite directory" command if there were no bucketing or list bucketing.
+ String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_";
+ String tmpTableName;
+ while (true) {
+ tmpTableName = tmpPrefix + System.currentTimeMillis();
+ String query =
+ buildMmCompactionCtQuery(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(),
+ baseLocation.toString());
+ LOG.info("Compacting a MM table into " + query);
+ try {
+ DriverUtils.runOnDriver(driverConf, user, sessionState, query);
+ break;
+ } catch (Exception ex) {
+ Throwable cause = ex;
+ while (cause != null && !(cause instanceof AlreadyExistsException)) {
+ cause = cause.getCause();
+ }
+ if (cause == null) {
+ throw new IOException(ex);
+ }
+ }
+ }
+ String query = buildMmCompactionQuery(table, partition, tmpTableName);
+ LOG.info("Compacting a MM table via " + query);
+ long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(hiveConf);
+ DriverUtils.runOnDriver(driverConf, user, sessionState, query, writeIds, compactorTxnId);
+ commitMmCompaction(tmpLocation, storageDescriptor.getLocation(), hiveConf, writeIds, compactorTxnId);
+ DriverUtils.runOnDriver(driverConf, user, sessionState, "drop table if exists " + tmpTableName);
+ } catch (HiveException e) {
+ LOG.error("Error compacting a MM table", e);
+ throw new IOException(e);
+ }
+ }
+
+ // Remove the directories for aborted transactions only
+ private void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException {
+ // For MM table, we only want to delete delta dirs for aborted txns.
+ List<Path> filesToDelete = dir.getAbortedDirectories();
+ if (filesToDelete.size() < 1) {
+ return;
+ }
+ LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
+ FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
+ for (Path dead : filesToDelete) {
+ LOG.debug("Going to delete path " + dead.toString());
+ fs.delete(dead, true);
+ }
+ }
+
+ private String buildMmCompactionCtQuery(String fullName, Table t, StorageDescriptor sd, String location) {
+ StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append("(");
+ List<FieldSchema> cols = t.getSd().getCols();
+ boolean isFirst = true;
+ for (FieldSchema col : cols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("`").append(col.getName()).append("` ").append(col.getType());
+ }
+ query.append(") ");
+
+ // Bucketing.
+ List<String> buckCols = t.getSd().getBucketCols();
+ if (buckCols.size() > 0) {
+ query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
+ List<Order> sortCols = t.getSd().getSortCols();
+ if (sortCols.size() > 0) {
+ query.append("SORTED BY (");
+ isFirst = true;
+ for (Order sortCol : sortCols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder()));
+ }
+ query.append(") ");
+ }
+ query.append("INTO ").append(t.getSd().getNumBuckets()).append(" BUCKETS");
+ }
+
+ // Stored as directories. We don't care about the skew otherwise.
+ if (t.getSd().isStoredAsSubDirectories()) {
+ SkewedInfo skewedInfo = t.getSd().getSkewedInfo();
+ if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+ query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
+ isFirst = true;
+ for (List<String> colValues : skewedInfo.getSkewedColValues()) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("('").append(StringUtils.join("','", colValues)).append("')");
+ }
+ query.append(") STORED AS DIRECTORIES");
+ }
+ }
+
+ SerDeInfo serdeInfo = sd.getSerdeInfo();
+ Map<String, String> serdeParams = serdeInfo.getParameters();
+ query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib()))
+ .append("'");
+ String sh = t.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+ assert sh == null; // Not supposed to be a compactable table.
+ if (!serdeParams.isEmpty()) {
+ ShowCreateTableOperation.appendSerdeParams(query, serdeParams);
+ }
+ query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat()))
+ .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat()))
+ .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
+ // Exclude all standard table properties.
+ Set<String> excludes = getHiveMetastoreConstants();
+ excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
+ isFirst = true;
+ for (Map.Entry<String, String> e : t.getParameters().entrySet()) {
+ if (e.getValue() == null) {
+ continue;
+ }
+ if (excludes.contains(e.getKey())) {
+ continue;
+ }
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue()))
+ .append("'");
+ }
+ if (!isFirst) {
+ query.append(", ");
+ }
+ query.append("'transactional'='false')");
+ return query.toString();
+
+ }
+
+ private String buildMmCompactionQuery(Table t, Partition p, String tmpName) {
+ String fullName = t.getDbName() + "." + t.getTableName();
+ // ideally we should make a special form of insert overwrite so that we:
+ // 1) Could use fast merge path for ORC and RC.
+ // 2) Didn't have to create a table.
+
+ StringBuilder query = new StringBuilder("insert overwrite table " + tmpName + " ");
+ StringBuilder filter = new StringBuilder();
+ if (p != null) {
+ filter = new StringBuilder(" where ");
+ List<String> vals = p.getValues();
+ List<FieldSchema> keys = t.getPartitionKeys();
+ assert keys.size() == vals.size();
+ for (int i = 0; i < keys.size(); ++i) {
+ filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i))
+ .append("'");
+ }
+ query.append(" select ");
+ // Use table descriptor for columns.
+ List<FieldSchema> cols = t.getSd().getCols();
+ for (int i = 0; i < cols.size(); ++i) {
+ query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`");
+ }
+ } else {
+ query.append("select *");
+ }
+ query.append(" from ").append(fullName).append(filter);
+ return query.toString();
+ }
+
+ /**
+ * Note: similar logic to the main committer; however, no ORC versions and stuff like that.
+ * @param from The temp directory used for compactor output. Not the actual base/delta.
+ * @param to The final directory; basically a SD directory. Not the actual base/delta.
+ * @param compactorTxnId txn that the compactor started
+ */
+ private void commitMmCompaction(String from, String to, Configuration conf, ValidWriteIdList actualWriteIds,
+ long compactorTxnId) throws IOException {
+ Path fromPath = new Path(from), toPath = new Path(to);
+ FileSystem fs = fromPath.getFileSystem(conf);
+ // Assume the high watermark can be used as maximum transaction ID.
+ //todo: is that true? can it be aborted? does it matter for compaction? probably OK since
+ //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs.
+ long maxTxn = actualWriteIds.getHighWatermark();
+ AcidOutputFormat.Options options =
+ new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0)
+ .statementId(-1).visibilityTxnId(compactorTxnId);
+ Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
+ if (!fs.exists(fromPath)) {
+ LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir);
+ fs.mkdirs(newBaseDir);
+ return;
+ }
+ LOG.info("Moving contents of " + from + " to " + to);
+ FileStatus[] children = fs.listStatus(fromPath);
+ if (children.length != 1) {
+ throw new IOException("Unexpected files in the source: " + Arrays.toString(children));
+ }
+ FileStatus dirPath = children[0];
+ fs.rename(dirPath.getPath(), newBaseDir);
+ fs.delete(fromPath, true);
+ }
+
+ private static Set<String> getHiveMetastoreConstants() {
+ Set<String> result = new HashSet<>();
+ for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
+ if (!Modifier.isStatic(f.getModifiers())) {
+ continue;
+ }
+ if (!Modifier.isFinal(f.getModifiers())) {
+ continue;
+ }
+ if (!String.class.equals(f.getType())) {
+ continue;
+ }
+ f.setAccessible(true);
+ try {
+ result.add((String) f.get(null));
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return result;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
new file mode 100644
index 0000000..80119de
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Common interface for query based compactions.
+ */
+abstract class QueryCompactor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(QueryCompactor.class.getName());
+ private static final String TMPDIR = "_tmp";
+
+ /**
+ * Start a query based compaction.
+ * @param hiveConf hive configuration
+ * @param table the table, where the compaction should run
+ * @param partition the partition, where the compaction should run
+ * @param storageDescriptor this is the resolved storage descriptor
+ * @param writeIds valid write IDs used to filter rows while they're being read for compaction
+ * @param compactionInfo provides info about the type of compaction
+ * @throws IOException compaction cannot be finished.
+ */
+ abstract void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
+ ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException;
+
+ /**
+ * Collection of some helper functions.
+ */
+ static class Util {
+ /**
+ * Determine if compaction can run in a specified directory.
+ * @param isMajorCompaction type of compaction.
+ * @param dir the delta directory
+ * @param sd resolved storage descriptor
+ * @return true, if compaction can run.
+ */
+ static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) {
+ int deltaCount = dir.getCurrentDirectories().size();
+ int origCount = dir.getOriginalFiles().size();
+
+ StringBuilder deltaInfo = new StringBuilder().append(deltaCount);
+ boolean isEnoughToCompact;
+
+ if (isMajorCompaction) {
+ isEnoughToCompact = (origCount > 0 || deltaCount + (dir.getBaseDirectory() == null ? 0 : 1) > 1);
+
+ } else {
+ isEnoughToCompact = (deltaCount > 1);
+
+ if (deltaCount == 2) {
+ Map<String, Long> deltaByType = dir.getCurrentDirectories().stream().collect(Collectors
+ .groupingBy(delta -> (delta.isDeleteDelta() ? AcidUtils.DELETE_DELTA_PREFIX : AcidUtils.DELTA_PREFIX),
+ Collectors.counting()));
+
+ isEnoughToCompact = (deltaByType.size() != deltaCount);
+ deltaInfo.append(" ").append(deltaByType);
+ }
+ }
+
+ if (!isEnoughToCompact) {
+ LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}", sd.getLocation(),
+ dir.getBaseDirectory(), deltaInfo, origCount);
+ }
+ return isEnoughToCompact;
+ }
+
+ /**
+ * Generate a random tmp path, under the provided storage.
+ * @param sd storage descriptor, must be not null.
+ * @return path, always not null
+ */
+ static String generateTmpPath(StorageDescriptor sd) {
+ return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
new file mode 100644
index 0000000..41cb4b6
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+
+/**
+ * Simple factory class, which returns an instance of {@link QueryCompactor}.
+ */
+final class QueryCompactorFactory {
+
+ private QueryCompactorFactory() {
+ }
+
+ /**
+ * Get an instance of {@link QueryCompactor}.
+ * @param table the table, on which the compaction should be running, must be not null.
+ * @param configuration the hive configuration, must be not null.
+ * @param compactionInfo provides insight about the type of compaction, must be not null.
+ * @return {@link QueryCompactor} or null.
+ */
+ static QueryCompactor getQueryCompactor(Table table, HiveConf configuration, CompactionInfo compactionInfo) {
+ if (!AcidUtils.isInsertOnlyTable(table.getParameters()) && HiveConf
+ .getBoolVar(configuration, HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED)) {
+ if (compactionInfo.isMajorCompaction()) {
+ return new MajorQueryCompactor();
+ } else {
+ throw new RuntimeException("Query based compaction is not currently supported for minor compactions");
+ }
+ }
+
+ if (AcidUtils.isInsertOnlyTable(table.getParameters()) && HiveConf
+ .getBoolVar(configuration, HiveConf.ConfVars.HIVE_COMPACTOR_COMPACT_MM)) {
+ return new MmMajorQueryCompactor();
+ }
+
+ return null;
+ }
+
+}