You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2020/03/26 09:35:55 UTC
[hive] branch master updated: HIVE-22875: Refactor query creation
in QueryCompactor implementations (Karen Coppage, reviewed by Laszlo Pinter)
This is an automated email from the ASF dual-hosted git repository.
lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 8d71480 HIVE-22875: Refactor query creation in QueryCompactor implementations (Karen Coppage, reviewed by Laszlo Pinter)
8d71480 is described below
commit 8d71480aad8b13a67932ce01701235dc85fcab4e
Author: Karen Coppage <ka...@cloudera.com>
AuthorDate: Thu Mar 26 10:28:09 2020 +0100
HIVE-22875: Refactor query creation in QueryCompactor implementations (Karen Coppage, reviewed by Laszlo Pinter)
---
.../ql/txn/compactor/TestMmCompactorOnTez.java | 3 -
.../ql/txn/compactor/CompactionQueryBuilder.java | 601 +++++++++++++++++++++
.../hive/ql/txn/compactor/MajorQueryCompactor.java | 71 +--
.../hive/ql/txn/compactor/MinorQueryCompactor.java | 147 +++--
.../ql/txn/compactor/MmMajorQueryCompactor.java | 59 +-
.../ql/txn/compactor/MmMinorQueryCompactor.java | 112 ++--
.../ql/txn/compactor/MmQueryCompactorUtils.java | 200 -------
.../hive/ql/txn/compactor/QueryCompactor.java | 42 +-
8 files changed, 787 insertions(+), 448 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
index 074430c..43a216b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestMmCompactorOnTez.java
@@ -301,7 +301,6 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
CompactorTestUtil.runCleaner(conf);
verifySuccessulTxn(1);
- List<ShowCompactResponseElement> compacts;
// Insert test data into test table
dataProvider.insertMmTestData(tableName);
// Run a compaction
@@ -341,7 +340,6 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
CompactorTestUtil.runCleaner(conf);
verifySuccessulTxn(1);
- List<ShowCompactResponseElement> compacts;
// Verify delta directories after compaction
Assert.assertEquals("Delta directories does not match after minor compaction",
Collections.singletonList("delta_0000001_0000003_v0000007"),
@@ -382,7 +380,6 @@ public class TestMmCompactorOnTez extends CompactorOnTezTest {
CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MAJOR, true);
CompactorTestUtil.runCleaner(conf);
verifySuccessulTxn(1);
- List<ShowCompactResponseElement> compacts;
// Verify base directory after compaction
Assert.assertEquals("Base directory does not match after major compaction",
Collections.singletonList("base_0000003_v0000007"),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
new file mode 100644
index 0000000..6b3a4db
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Builds query strings that help with query-based compaction of CRUD and insert-only tables.
+ */
+class CompactionQueryBuilder {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompactionQueryBuilder.class.getName());
+
+ // required fields, set in constructor
+ private final Operation operation;
+ private String resultTableName;
+
+ // required for some types of compaction. Required...
+ private Table sourceTab; // for Create and for Insert in CRUD and insert-only major
+ private StorageDescriptor storageDescriptor; // for Create in insert-only
+ private String location; // for Create
+ private ValidWriteIdList validWriteIdList; // for Alter/Insert in minor and CRUD
+ private AcidUtils.Directory dir; // for Alter in minor
+ private Partition sourcePartition; // for Insert in major and insert-only minor
+ private String fromTableName; // for Insert
+
+ // settable booleans
+ private boolean isPartitioned; // for Create
+ private boolean isBucketed; // for Create in CRUD
+ private boolean isDeleteDelta; // for Alter in CRUD minor
+
+ // internal use only, for legibility
+ private final boolean major;
+ private final boolean minor;
+ private final boolean crud;
+ private final boolean insertOnly;
+
+ enum CompactionType {
+ MAJOR_CRUD, MINOR_CRUD, MAJOR_INSERT_ONLY, MINOR_INSERT_ONLY
+ }
+
+ enum Operation {
+ CREATE, ALTER, INSERT, DROP
+ }
+
+ /**
+ * Set source table – the table to compact.
+ * Required for Create operations and for Insert operations in crud and insert-only major
+ * compaction.
+ *
+ * @param sourceTab table to compact, not null
+ */
+ CompactionQueryBuilder setSourceTab(Table sourceTab) {
+ this.sourceTab = sourceTab;
+ return this;
+ }
+
+ /**
+ * Set the StorageDescriptor of the table or partition to compact.
+ * Required for Create operations in insert-only compaction.
+ *
+ * @param storageDescriptor StorageDescriptor of the table or partition to compact, not null
+ */
+ CompactionQueryBuilder setStorageDescriptor(StorageDescriptor storageDescriptor) {
+ this.storageDescriptor = storageDescriptor;
+ return this;
+ }
+
+ /**
+ * Set the location of the temp tables.
+ * Used for Create operations.
+ *
+ * @param location location of the temp tables
+ */
+ CompactionQueryBuilder setLocation(String location) {
+ this.location = location;
+ return this;
+ }
+
+ /**
+ * Set list of valid write ids.
+ * Required for Alter and Insert operations in crud minor compaction.
+ *
+ * @param validWriteIdList list of valid write ids, not null
+ */
+ CompactionQueryBuilder setValidWriteIdList(ValidWriteIdList validWriteIdList) {
+ this.validWriteIdList = validWriteIdList;
+ return this;
+ }
+
+ /**
+ * Set Acid Directory.
+ * Required for Alter operations in minor compaction.
+ *
+ * @param dir Acid Directory, not null
+ */
+ CompactionQueryBuilder setDir(AcidUtils.Directory dir) {
+ this.dir = dir;
+ return this;
+ }
+
+ /**
+ * Set partition to compact, if we are compacting a partition.
+ * Required for Insert operations in major and insert-only minor compaction.
+ */
+ CompactionQueryBuilder setSourcePartition(Partition sourcePartition) {
+ this.sourcePartition = sourcePartition;
+ return this;
+ }
+
+ /**
+ * Set table to select from.
+ * Required for Insert operations.
+ *
+ * @param fromTableName name of table to select from, not null
+ */
+ CompactionQueryBuilder setFromTableName(String fromTableName) {
+ this.fromTableName = fromTableName;
+ return this;
+ }
+
+ /**
+ * If true, Create operations will result in a table with partition column `file_name`.
+ */
+ CompactionQueryBuilder setPartitioned(boolean partitioned) {
+ isPartitioned = partitioned;
+ return this;
+ }
+
+ /**
+ * If true, Create operations for CRUD minor compaction will result in a bucketed table.
+ */
+ CompactionQueryBuilder setBucketed(boolean bucketed) {
+ isBucketed = bucketed;
+ return this;
+ }
+
+ /**
+ * If true, during CRUD minor compaction, Alter operations will result in the temp table's
+ * partitions pointing to delete delta directories as opposed to insert deltas' directories (see
+ * MinorQueryCompactor for details).
+ */
+ CompactionQueryBuilder setIsDeleteDelta(boolean deleteDelta) {
+ isDeleteDelta = deleteDelta;
+ return this;
+ }
+
+ /**
+ * Construct a CompactionQueryBuilder with required params.
+ *
+ * @param compactionType major or minor; crud or insert-only, e.g. CompactionType.MAJOR_CRUD.
+ * Cannot be null.
+ * @param operation query's Operation e.g. Operation.CREATE.
+ * @param resultTableName the name of the table we are running the operation on
+ * @throws IllegalArgumentException if compactionType is null
+ */
+ CompactionQueryBuilder(CompactionType compactionType, Operation operation,
+ String resultTableName) {
+ if (compactionType == null) {
+ throw new IllegalArgumentException("CompactionQueryBuilder.CompactionType cannot be null");
+ }
+ this.operation = operation;
+ this.resultTableName = resultTableName;
+ major = compactionType == CompactionType.MAJOR_CRUD
+ || compactionType == CompactionType.MAJOR_INSERT_ONLY;
+ crud =
+ compactionType == CompactionType.MAJOR_CRUD || compactionType == CompactionType.MINOR_CRUD;
+ minor = !major;
+ insertOnly = !crud;
+ }
+
+ /**
+ * Build the query string based on parameters.
+ *
+ * @return query String
+ */
+ String build() {
+ StringBuilder query = new StringBuilder(operation.toString());
+
+ if (operation == Operation.CREATE) {
+ query.append(" temporary external");
+ }
+ if (operation == Operation.INSERT) {
+ query.append(" into");
+ }
+ query.append(" table ");
+
+ if (operation == Operation.DROP) {
+ query.append("if exists ");
+ }
+
+ query.append(resultTableName);
+
+ switch (operation) {
+ case CREATE:
+ getDdlForCreate(query);
+ break;
+ case ALTER:
+ buildAddClauseForAlter(query);
+ break;
+ case INSERT:
+ query.append(" select ");
+ buildSelectClauseForInsert(query);
+ query.append(" from ")
+ .append(fromTableName);
+ buildWhereClauseForInsert(query);
+ break;
+ case DROP:
+ default:
+ }
+
+ return query.toString();
+ }
+
+ private void buildAddClauseForAlter(StringBuilder query) {
+ if (validWriteIdList == null || dir == null) {
+ query.setLength(0);
+ return; // avoid NPEs, don't throw an exception but return an empty query
+ }
+ long minWriteID =
+ validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId();
+ long highWatermark = validWriteIdList.getHighWatermark();
+ List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories().stream().filter(
+ delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark
+ && delta.getMinWriteId() >= minWriteID)
+ .collect(Collectors.toList());
+ if (deltas.isEmpty()) {
+ query.setLength(0); // no alter query needed; clear StringBuilder
+ return;
+ }
+ query.append(" add ");
+ deltas.forEach(delta -> query.append("partition (file_name='")
+ .append(delta.getPath().getName()).append("')"
+ + " location '").append(delta.getPath()).append("' "));
+ }
+
+
+ private void buildSelectClauseForInsert(StringBuilder query) {
+ // Need list of columns for major crud, mmmajor partitioned, mmminor
+ List<FieldSchema> cols;
+ if (major && crud || major && insertOnly && sourcePartition != null || minor && insertOnly) {
+ if (sourceTab == null) {
+ return; // avoid NPEs, don't throw an exception but skip this part of the query
+ }
+ cols = sourceTab.getSd().getCols();
+ } else {
+ cols = null;
+ }
+
+ if (crud) {
+ if (major) {
+ query.append(
+ "validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), "
+ + "ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, "
+ + "NAMED_STRUCT(");
+ for (int i = 0; i < cols.size(); ++i) {
+ query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', ")
+ .append(cols.get(i).getName());
+ }
+ query.append(") ");
+ } else { //minor
+ query.append(
+ "`operation`, `originalTransaction`, `bucket`, `rowId`, `currentTransaction`, `row`");
+ }
+
+ } else { // mm
+ if (major) {
+ if (sourcePartition != null) { //mmmajor and partitioned
+ for (int i = 0; i < cols.size(); ++i) {
+ query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`");
+ }
+ } else { // mmmajor and unpartitioned
+ query.append("*");
+ }
+ } else { // mmminor
+ for (int i = 0; i < cols.size(); ++i) {
+ query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`");
+ }
+ }
+ }
+ }
+
+ private void buildWhereClauseForInsert(StringBuilder query) {
+ if (major && sourcePartition != null && sourceTab != null) {
+ List<String> vals = sourcePartition.getValues();
+ List<FieldSchema> keys = sourceTab.getPartitionKeys();
+ if (keys.size() != vals.size()) {
+ throw new IllegalStateException("source partition values ("
+ + Arrays.toString(vals.toArray()) + ") do not match source table values ("
+ + Arrays.toString(keys.toArray()) + "). Failing compaction.");
+ }
+
+ query.append(" where ");
+ for (int i = 0; i < keys.size(); ++i) {
+ query.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='")
+ .append(vals.get(i)).append("'");
+ }
+ }
+
+ if (minor && crud && validWriteIdList != null) {
+ long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds();
+ if (invalidWriteIds.length > 0) {
+ query.append(" where `originalTransaction` not in (").append(
+ org.apache.commons.lang3.StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ","))
+ .append(")");
+ }
+ }
+ }
+
+ private void getDdlForCreate(StringBuilder query) {
+ defineColumns(query);
+
+ // PARTITIONED BY. Used for parts of minor compaction.
+ if (isPartitioned) {
+ query.append(" PARTITIONED BY (`file_name` STRING) ");
+ }
+
+ // CLUSTERED BY. (bucketing)
+ int bucketingVersion = 0;
+ if (crud && minor) {
+ bucketingVersion = getMinorCrudBucketing(query, bucketingVersion);
+ } else if (insertOnly) {
+ getMmBucketing(query);
+ }
+
+ // SKEWED BY
+ if (insertOnly) {
+ getSkewedByClause(query);
+ }
+
+ // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT
+ if (crud) {
+ query.append(" stored as orc");
+ } else {
+ copySerdeFromSourceTable(query);
+ }
+
+ // LOCATION
+ if (location != null) {
+ query.append(" LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("'");
+ }
+
+ // TBLPROPERTIES
+ addTblProperties(query, bucketingVersion);
+ }
+
+ /**
+ * Define columns of the create query.
+ */
+ private void defineColumns(StringBuilder query) {
+ if (sourceTab == null) {
+ return; // avoid NPEs, don't throw an exception but skip this part of the query
+ }
+ query.append("(");
+ if (crud) {
+ query.append(
+ "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, "
+ + "`currentTransaction` bigint, `row` struct<");
+ }
+ List<FieldSchema> cols = sourceTab.getSd().getCols();
+ boolean isFirst = true;
+ for (FieldSchema col : cols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("`").append(col.getName()).append("` ");
+ query.append(crud ? ":" : "");
+ query.append(col.getType());
+ }
+ query.append(crud ? ">" : "");
+ query.append(") ");
+ }
+
+ /**
+ * Part of Create operation. Copy source table bucketing for insert-only compaction.
+ */
+ private void getMmBucketing(StringBuilder query) {
+ if (sourceTab == null) {
+ return; // avoid NPEs, don't throw an exception but skip this part of the query
+ }
+ boolean isFirst;
+ List<String> buckCols = sourceTab.getSd().getBucketCols();
+ if (buckCols.size() > 0) {
+ query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
+ List<Order> sortCols = sourceTab.getSd().getSortCols();
+ if (sortCols.size() > 0) {
+ query.append("SORTED BY (");
+ isFirst = true;
+ for (Order sortCol : sortCols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append(sortCol.getCol()).append(" ")
+ .append(DirectionUtils.codeToText(sortCol.getOrder()));
+ }
+ query.append(") ");
+ }
+ query.append("INTO ").append(sourceTab.getSd().getNumBuckets()).append(" BUCKETS");
+ }
+ }
+
+ /**
+ * Part of Create operation. Minor crud compaction uses its own bucketing system.
+ */
+ private int getMinorCrudBucketing(StringBuilder query, int bucketingVersion) {
+ if (isBucketed && sourceTab != null) { // skip if sourceTab is null to avoid NPEs
+ int numBuckets = 1;
+ try {
+ org.apache.hadoop.hive.ql.metadata.Table t =
+ Hive.get().getTable(sourceTab.getDbName(), sourceTab.getTableName());
+ numBuckets = Math.max(t.getNumBuckets(), numBuckets);
+ bucketingVersion = t.getBucketingVersion();
+ } catch (HiveException e) {
+ LOG.info("Error finding table {}. Minor compaction result will use 0 buckets.",
+ sourceTab.getTableName());
+ } finally {
+ query.append(" clustered by (`bucket`)")
+ .append(" sorted by (`bucket`, `originalTransaction`, `rowId`)")
+ .append(" into ").append(numBuckets).append(" buckets");
+ }
+ }
+ return bucketingVersion;
+ }
+
+ /**
+ * Part of Create operation. Insert-only compaction tables copy source tables.
+ */
+ private void getSkewedByClause(StringBuilder query) {
+ if (sourceTab == null) {
+ return; // avoid NPEs, don't throw an exception but skip this part of the query
+ }
+ boolean isFirst; // Stored as directories. We don't care about the skew otherwise.
+ if (sourceTab.getSd().isStoredAsSubDirectories()) {
+ SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo();
+ if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+ query.append(" SKEWED BY (")
+ .append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
+ isFirst = true;
+ for (List<String> colValues : skewedInfo.getSkewedColValues()) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("('").append(StringUtils.join("','", colValues)).append("')");
+ }
+ query.append(") STORED AS DIRECTORIES");
+ }
+ }
+ }
+
+ /**
+ * Part of Create operation. Insert-only compaction tables copy source tables' serde.
+ */
+ private void copySerdeFromSourceTable(StringBuilder query) {
+ if (storageDescriptor == null) {
+ return; // avoid NPEs, don't throw an exception but skip this part of the query
+ }
+ ensureTableToCompactIsNative();
+ SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo();
+ Map<String, String> serdeParams = serdeInfo.getParameters();
+ query.append(" ROW FORMAT SERDE '")
+ .append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib())).append("'");
+ // WITH SERDEPROPERTIES
+ if (!serdeParams.isEmpty()) {
+ ShowCreateTableOperation.appendSerdeParams(query, serdeParams);
+ }
+ query.append("STORED AS INPUTFORMAT '")
+ .append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getInputFormat())).append("'")
+ .append(" OUTPUTFORMAT '")
+ .append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getOutputFormat()))
+ .append("'");
+ }
+
+ /**
+ * Part of Create operation. All tmp tables are not transactional and are marked as
+ * compaction tables. Additionally...
+ * - Crud compaction temp tables need tblproperty, "compactiontable."
+ * - Minor crud compaction temp tables need bucketing version tblproperty, if table is bucketed.
+ * - Insert-only compaction tables copy source tables' tblproperties, except metastore/statistics
+ * properties.
+ */
+ private void addTblProperties(StringBuilder query, int bucketingVersion) {
+ Map<String, String> tblProperties = new HashMap<>();
+ tblProperties.put("transactional", "false");
+ if (crud) {
+ tblProperties.put(AcidUtils.COMPACTOR_TABLE_PROPERTY, "true");
+ }
+ if (crud && minor && isBucketed) {
+ tblProperties.put("bucketing_version", String.valueOf(bucketingVersion));
+ }
+ if (insertOnly && sourceTab != null) { // to avoid NPEs, skip this part if sourceTab is null
+ // Exclude all standard table properties.
+ Set<String> excludes = getHiveMetastoreConstants();
+ excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
+ for (Map.Entry<String, String> e : sourceTab.getParameters().entrySet()) {
+ if (e.getValue() == null) {
+ continue;
+ }
+ if (excludes.contains(e.getKey())) {
+ continue;
+ }
+ tblProperties.put(e.getKey(), HiveStringUtils.escapeHiveCommand(e.getValue()));
+ }
+ }
+
+ // add TBLPROPERTIES clause to query
+ boolean isFirst;
+ query.append(" TBLPROPERTIES (");
+ isFirst = true;
+ for (Map.Entry<String, String> property : tblProperties.entrySet()) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ query.append("'").append(property.getKey()).append("'='").append(property.getValue())
+ .append("'");
+ isFirst = false;
+ }
+ query.append(")");
+ }
+
+ private static Set<String> getHiveMetastoreConstants() {
+ Set<String> result = new HashSet<>();
+ for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
+ if (!Modifier.isFinal(f.getModifiers())) {
+ continue;
+ }
+ if (!String.class.equals(f.getType())) {
+ continue;
+ }
+ f.setAccessible(true);
+ try {
+ result.add((String) f.get(null));
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return result;
+ }
+
+ private void ensureTableToCompactIsNative() {
+ if (sourceTab == null) {
+ return;
+ }
+ String storageHandler =
+ sourceTab.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+ if (storageHandler != null) {
+ String message = "Table " + sourceTab.getTableName() + "has a storage handler ("
+ + storageHandler + "). Failing compaction for this non-native table.";
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index 9385080..f47c23a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -21,14 +21,12 @@ import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import java.io.IOException;
import java.util.List;
@@ -40,7 +38,7 @@ final class MajorQueryCompactor extends QueryCompactor {
@Override
void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
- ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException, HiveException {
+ ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException {
AcidUtils
.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters()));
@@ -83,57 +81,34 @@ final class MajorQueryCompactor extends QueryCompactor {
* (current write id will be the same as original write id).
* We will be achieving the ordering via a custom split grouper for compactor.
* See {@link org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the config description.
- * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)}
- * for details on the mechanism.
*/
- private List<String> getCreateQueries(String fullName, Table t, String tmpTableLocation) throws HiveException {
- StringBuilder query = new StringBuilder(Util.getCreateTempTableQueryWithAcidColumns(fullName, t));
- org.apache.hadoop.hive.ql.metadata.Table table = Hive.get().getTable(t.getDbName(), t.getTableName(), false);
- int numBuckets = 1;
- int bucketingVersion = 0;
- if (table != null) {
- numBuckets = Math.max(table.getNumBuckets(), numBuckets);
- bucketingVersion = table.getBucketingVersion();
- }
- query.append(" clustered by (`bucket`) into ").append(numBuckets).append(" buckets");
- query.append(" stored as orc");
- query.append(" location '");
- query.append(tmpTableLocation);
- query.append("' tblproperties ('transactional'='false',");
- query.append(" 'bucketing_version'='");
- query.append(bucketingVersion);
- query.append("','");
- query.append(AcidUtils.COMPACTOR_TABLE_PROPERTY);
- query.append("'='true'");
- query.append(")");
- return Lists.newArrayList(query.toString());
+ private List<String> getCreateQueries(String fullName, Table t, String tmpTableLocation) {
+ return Lists.newArrayList(new CompactionQueryBuilder(
+ CompactionQueryBuilder.CompactionType.MAJOR_CRUD,
+ CompactionQueryBuilder.Operation.CREATE,
+ fullName)
+ .setSourceTab(t)
+ .setLocation(tmpTableLocation)
+ .build());
}
private List<String> getCompactionQueries(Table t, Partition p, String tmpName) {
- String fullName = t.getDbName() + "." + t.getTableName();
- StringBuilder query = new StringBuilder("insert into table " + tmpName + " ");
- StringBuilder filter = new StringBuilder();
- if (p != null) {
- filter.append(" where ");
- List<String> vals = p.getValues();
- List<FieldSchema> keys = t.getPartitionKeys();
- assert keys.size() == vals.size();
- for (int i = 0; i < keys.size(); ++i) {
- filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i))
- .append("'");
- }
- }
- query.append(" select validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), ROW__ID.writeId, "
- + "ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, NAMED_STRUCT(");
- List<FieldSchema> cols = t.getSd().getCols();
- for (int i = 0; i < cols.size(); ++i) {
- query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', ").append(cols.get(i).getName());
- }
- query.append(") from ").append(fullName).append(filter);
- return Lists.newArrayList(query.toString());
+ return Lists.newArrayList(
+ new CompactionQueryBuilder(
+ CompactionQueryBuilder.CompactionType.MAJOR_CRUD,
+ CompactionQueryBuilder.Operation.INSERT,
+ tmpName)
+ .setFromTableName(t.getTableName())
+ .setSourceTab(t)
+ .setSourcePartition(p)
+ .build());
}
private List<String> getDropQueries(String tmpTableName) {
- return Lists.newArrayList("drop table if exists " + tmpTableName);
+ return Lists.newArrayList(
+ new CompactionQueryBuilder(
+ CompactionQueryBuilder.CompactionType.MAJOR_CRUD,
+ CompactionQueryBuilder.Operation.DROP,
+ tmpTableName).build());
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
index 01cd2fc..1bf0bee 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
@@ -17,8 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.StringUtils;
+import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -28,7 +27,6 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
@@ -37,8 +35,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
/**
* Class responsible for handling query based minor compaction.
@@ -66,7 +62,7 @@ final class MinorQueryCompactor extends QueryCompactor {
table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + System.currentTimeMillis();
List<String> createQueries = getCreateQueries(table, tmpTableName, dir, writeIds, conf, storageDescriptor);
- List<String> compactionQueries = getCompactionQueries(tmpTableName, writeIds.getInvalidWriteIds());
+ List<String> compactionQueries = getCompactionQueries(tmpTableName, table, writeIds);
List<String> dropQueries = getDropQueries(tmpTableName);
runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries,
@@ -98,7 +94,7 @@ final class MinorQueryCompactor extends QueryCompactor {
* @return list of create/alter queries, always non-null
*/
private List<String> getCreateQueries(Table table, String tempTableBase, AcidUtils.Directory dir,
- ValidWriteIdList writeIds, HiveConf conf, StorageDescriptor storageDescriptor) throws HiveException {
+ ValidWriteIdList writeIds, HiveConf conf, StorageDescriptor storageDescriptor) {
List<String> queries = new ArrayList<>();
long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId();
long highWatermark = writeIds.getHighWatermark();
@@ -106,7 +102,10 @@ final class MinorQueryCompactor extends QueryCompactor {
// create delta temp table
String tmpTableName = AcidUtils.DELTA_PREFIX + tempTableBase;
queries.add(buildCreateTableQuery(table, tmpTableName, true, false, null));
- buildAlterTableQuery(tmpTableName, dir, writeIds, false).ifPresent(queries::add);
+ String alterQuery = buildAlterTableQuery(tmpTableName, dir, writeIds, false);
+ if (!alterQuery.isEmpty()) {
+ queries.add(alterQuery);
+ }
AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(false)
.writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId)
.maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId);
@@ -120,7 +119,11 @@ final class MinorQueryCompactor extends QueryCompactor {
// create delete delta temp tables
String tmpDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tempTableBase;
queries.add(buildCreateTableQuery(table, tmpDeleteTableName, true, false, null));
- buildAlterTableQuery(tmpDeleteTableName, dir, writeIds, true).ifPresent(queries::add);
+
+ alterQuery = buildAlterTableQuery(tmpDeleteTableName, dir, writeIds, true);
+ if (!alterQuery.isEmpty()) {
+ queries.add(alterQuery);
+ }
options = new AcidOutputFormat.Options(conf).writingBase(false).writingDeleteDelta(true).isCompressed(false)
.minimumWriteId(minOpenWriteId).maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId);
String tmpTableDeleteResultLocation = AcidUtils.baseOrDeltaSubdirPath(location,
@@ -151,40 +154,16 @@ final class MinorQueryCompactor extends QueryCompactor {
* </p>
*/
private String buildCreateTableQuery(Table table, String newTableName, boolean isPartitioned,
- boolean isBucketed, String location) throws HiveException {
- StringBuilder query = new StringBuilder(Util.getCreateTempTableQueryWithAcidColumns(newTableName, table));
- if (isPartitioned) {
- query.append(" partitioned by (`file_name` string)");
- }
- int bucketingVersion = 0;
- if (isBucketed) {
- int numBuckets = 1;
- org.apache.hadoop.hive.ql.metadata.Table t = Hive.get().getTable(table.getDbName(), table.getTableName(), false);
- if (t != null) {
- numBuckets = Math.max(t.getNumBuckets(), numBuckets);
- bucketingVersion = t.getBucketingVersion();
- }
- query.append(" clustered by (`bucket`)").append(" sorted by (`bucket`, `originalTransaction`, `rowId`)")
- .append(" into ").append(numBuckets).append(" buckets");
- }
- query.append(" stored as orc");
- if (location != null && !location.isEmpty()) {
- query.append(" location '");
- query.append(location);
- query.append("'");
- }
- query.append(" tblproperties ('transactional'='false'");
- query.append(", '");
- query.append(AcidUtils.COMPACTOR_TABLE_PROPERTY);
- query.append("'='true'");
- if (isBucketed) {
- query.append(", 'bucketing_version'='")
- .append(bucketingVersion)
- .append("')");
- } else {
- query.append(")");
- }
- return query.toString();
+ boolean isBucketed, String location) {
+ return new CompactionQueryBuilder(
+ CompactionQueryBuilder.CompactionType.MINOR_CRUD,
+ CompactionQueryBuilder.Operation.CREATE,
+ newTableName)
+ .setSourceTab(table)
+ .setBucketed(isBucketed)
+ .setPartitioned(isPartitioned)
+ .setLocation(location)
+ .build();
}
/**
@@ -194,44 +173,36 @@ final class MinorQueryCompactor extends QueryCompactor {
* @param validWriteIdList list of valid write IDs
* @param isDeleteDelta if true, only the delete delta directories will be mapped as new partitions, otherwise only
* the delta directories
- * @return alter table statement wrapped in {@link Optional}.
+ * @return alter table statement
*/
- private Optional<String> buildAlterTableQuery(String tableName, AcidUtils.Directory dir,
+ private String buildAlterTableQuery(String tableName, AcidUtils.Directory dir,
ValidWriteIdList validWriteIdList, boolean isDeleteDelta) {
- // add partitions
- if (!dir.getCurrentDirectories().isEmpty()) {
- long minWriteID = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId();
- long highWatermark = validWriteIdList.getHighWatermark();
- List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories().stream().filter(
- delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark
- && delta.getMinWriteId() >= minWriteID)
- .collect(Collectors.toList());
- if (!deltas.isEmpty()) {
- StringBuilder query = new StringBuilder().append("alter table ").append(tableName);
- query.append(" add ");
- deltas.forEach(
- delta -> query.append("partition (file_name='").append(delta.getPath().getName()).append("') location '")
- .append(delta.getPath()).append("' "));
- return Optional.of(query.toString());
- }
- }
- return Optional.empty();
+ return new CompactionQueryBuilder(
+ CompactionQueryBuilder.CompactionType.MINOR_CRUD,
+ CompactionQueryBuilder.Operation.ALTER,
+ tableName)
+ .setDir(dir)
+ .setValidWriteIdList(validWriteIdList)
+ .setIsDeleteDelta(isDeleteDelta)
+ .build();
}
/**
* Get a list of compaction queries which fills up the delta/delete-delta temporary result tables.
* @param tmpTableBase an unique identifier, which helps to find all the temporary tables
- * @param invalidWriteIds list of invalid write IDs. This list is used to filter out aborted/open transactions
+ * @param table
+ * @param validWriteIdList list of valid write IDs. This list is used to filter out aborted/open
+ * transactions
* @return list of compaction queries, always non-null
*/
- private List<String> getCompactionQueries(String tmpTableBase, long[] invalidWriteIds) {
+ private List<String> getCompactionQueries(String tmpTableBase, Table table, ValidWriteIdList validWriteIdList) {
List<String> queries = new ArrayList<>();
String sourceTableName = AcidUtils.DELTA_PREFIX + tmpTableBase;
String resultTableName = sourceTableName + "_result";
- queries.add(buildCompactionQuery(sourceTableName, resultTableName, invalidWriteIds));
+ queries.add(buildCompactionQuery(sourceTableName, resultTableName, table, validWriteIdList));
String sourceDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase;
String resultDeleteTableName = sourceDeleteTableName + "_result";
- queries.add(buildCompactionQuery(sourceDeleteTableName, resultDeleteTableName, invalidWriteIds));
+ queries.add(buildCompactionQuery(sourceDeleteTableName, resultDeleteTableName, table, validWriteIdList));
return queries;
}
@@ -240,19 +211,20 @@ final class MinorQueryCompactor extends QueryCompactor {
* it into the result table, filtering out all rows which belong to open/aborted transactions.
* @param sourceTableName the name of the source table
* @param resultTableName the name of the result table
- * @param invalidWriteIds list of invalid write IDs
+ * @param table the table to compact
+ * @param validWriteIdList list of valid write IDs
* @return compaction query, always non-null
*/
- private String buildCompactionQuery(String sourceTableName, String resultTableName, long[] invalidWriteIds) {
- StringBuilder query = new StringBuilder().append("insert into table ").append(resultTableName)
- .append(" select `operation`, `originalTransaction`, `bucket`, `rowId`, `currentTransaction`, `row` from ")
- .append(sourceTableName);
- if (invalidWriteIds.length > 0) {
- query.append(" where `originalTransaction` not in (")
- .append(StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ",")).append(")");
- }
-
- return query.toString();
+ private String buildCompactionQuery(String sourceTableName, String resultTableName, Table table,
+ ValidWriteIdList validWriteIdList) {
+ return new CompactionQueryBuilder(
+ CompactionQueryBuilder.CompactionType.MINOR_CRUD,
+ CompactionQueryBuilder.Operation.INSERT,
+ resultTableName)
+ .setFromTableName(sourceTableName)
+ .setSourceTab(table)
+ .setValidWriteIdList(validWriteIdList)
+ .build();
}
/**
@@ -261,12 +233,17 @@ final class MinorQueryCompactor extends QueryCompactor {
* @return list of drop table statements, always non-null
*/
private List<String> getDropQueries(String tmpTableBase) {
- List<String> queries = new ArrayList<>();
- String dropStm = "drop table if exists ";
- queries.add(dropStm + AcidUtils.DELTA_PREFIX + tmpTableBase);
- queries.add(dropStm + AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase);
- queries.add(dropStm + AcidUtils.DELTA_PREFIX + tmpTableBase + "_result");
- queries.add(dropStm + AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase + "_result");
- return queries;
+ return Lists.newArrayList(
+ getDropQuery(AcidUtils.DELTA_PREFIX + tmpTableBase),
+ getDropQuery(AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase),
+ getDropQuery(AcidUtils.DELTA_PREFIX + tmpTableBase + "_result"),
+ getDropQuery(AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase + "_result"));
+ }
+
+ private String getDropQuery(String tableToDrop) {
+ return new CompactionQueryBuilder(
+ CompactionQueryBuilder.CompactionType.MINOR_CRUD,
+ CompactionQueryBuilder.Operation.DROP,
+ tableToDrop).build();
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index 41fdd7e..114b6f7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -52,7 +51,7 @@ final class MmMajorQueryCompactor extends QueryCompactor {
AcidUtils.Directory dir = AcidUtils
.getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
table.getParameters(), false);
- MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir);
+ QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir);
String tmpLocation = Util.generateTmpPath(storageDescriptor);
Path baseLocation = new Path(tmpLocation, "_base");
@@ -106,42 +105,36 @@ final class MmMajorQueryCompactor extends QueryCompactor {
private List<String> getCreateQueries(String tmpTableName, Table table,
StorageDescriptor storageDescriptor, String baseLocation) {
- return Lists.newArrayList(MmQueryCompactorUtils
- .getCreateQuery(tmpTableName, table, storageDescriptor, baseLocation, false, false));
+ return Lists.newArrayList(
+ new CompactionQueryBuilder(
+ CompactionQueryBuilder.CompactionType.MAJOR_INSERT_ONLY,
+ CompactionQueryBuilder.Operation.CREATE,
+ tmpTableName)
+ .setSourceTab(table)
+ .setStorageDescriptor(storageDescriptor)
+ .setLocation(baseLocation)
+ .build()
+ );
}
private List<String> getCompactionQueries(Table t, Partition p, String tmpName) {
- String fullName = t.getDbName() + "." + t.getTableName();
- // ideally we should make a special form of insert overwrite so that we:
- // 1) Could use fast merge path for ORC and RC.
- // 2) Didn't have to create a table.
-
- StringBuilder query = new StringBuilder("insert overwrite table " + tmpName + " ");
- StringBuilder filter = new StringBuilder();
- if (p != null) {
- filter = new StringBuilder(" where ");
- List<String> vals = p.getValues();
- List<FieldSchema> keys = t.getPartitionKeys();
- assert keys.size() == vals.size();
- for (int i = 0; i < keys.size(); ++i) {
- filter.append(i == 0 ? "`" : " and `").append(keys.get(i).getName()).append("`='").append(vals.get(i))
- .append("'");
- }
- query.append(" select ");
- // Use table descriptor for columns.
- List<FieldSchema> cols = t.getSd().getCols();
- for (int i = 0; i < cols.size(); ++i) {
- query.append(i == 0 ? "`" : ", `").append(cols.get(i).getName()).append("`");
- }
- } else {
- query.append("select *");
- }
- query.append(" from ").append(fullName).append(filter);
- return Lists.newArrayList(query.toString());
+ return Lists.newArrayList(
+ new CompactionQueryBuilder(
+ CompactionQueryBuilder.CompactionType.MAJOR_INSERT_ONLY,
+ CompactionQueryBuilder.Operation.INSERT,
+ tmpName)
+ .setSourceTab(t)
+ .setFromTableName(t.getTableName())
+ .setSourcePartition(p)
+ .build()
+ );
}
private List<String> getDropQueries(String tmpTableName) {
- return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableName);
+ return Lists.newArrayList(
+ new CompactionQueryBuilder(
+ CompactionQueryBuilder.CompactionType.MAJOR_INSERT_ONLY,
+ CompactionQueryBuilder.Operation.DROP,
+ tmpTableName).build());
}
-
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
index feb667c..383891b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -36,10 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
/**
* Run a minor query compaction on an insert only (MM) table.
@@ -58,7 +54,7 @@ final class MmMinorQueryCompactor extends QueryCompactor {
AcidUtils.Directory dir = AcidUtils
.getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds,
Ref.from(false), false, table.getParameters(), false);
- MmQueryCompactorUtils.removeFilesForMmTable(hiveConf, dir);
+ QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir);
String tmpLocation = Util.generateTmpPath(storageDescriptor);
Path sourceTabLocation = new Path(tmpLocation);
Path resultTabLocation = new Path(tmpLocation, "_result");
@@ -66,14 +62,15 @@ final class MmMinorQueryCompactor extends QueryCompactor {
HiveConf driverConf = setUpDriverSession(hiveConf);
String tmpPrefix = table.getDbName() + ".tmp_minor_compactor_" + table.getTableName() + "_";
- String tmpTableBase = tmpPrefix + System.currentTimeMillis();
+ String tmpTableName = tmpPrefix + System.currentTimeMillis();
+ String resultTmpTableName = tmpTableName + "_result";
List<String> createTableQueries =
- getCreateQueries(tmpTableBase, table, partition == null ? table.getSd() : partition.getSd(),
+ getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(),
sourceTabLocation.toString(), resultTabLocation.toString(), dir, writeIds);
- List<String> compactionQueries = getCompactionQueries(tmpTableBase, table.getSd());
- List<String> dropQueries = getDropQueries(tmpTableBase);
- runCompactionQueries(driverConf, tmpTableBase, storageDescriptor, writeIds, compactionInfo,
+ List<String> compactionQueries = getCompactionQueries(tmpTableName, resultTmpTableName, table);
+ List<String> dropQueries = getDropQueries(tmpTableName);
+ runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo,
createTableQueries, compactionQueries, dropQueries);
}
@@ -126,42 +123,45 @@ final class MmMinorQueryCompactor extends QueryCompactor {
private List<String> getCreateQueries(String tmpTableBase, Table t, StorageDescriptor sd,
String sourceTabLocation, String resultTabLocation, AcidUtils.Directory dir,
ValidWriteIdList validWriteIdList) {
- List<String> queries = new ArrayList<>();
- queries.add(
- MmQueryCompactorUtils.getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true, true));
- buildAlterTableQuery(tmpTableBase, dir, validWriteIdList).ifPresent(queries::add);
- queries.add(MmQueryCompactorUtils
- .getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false, false));
+ List<String> queries = Lists.newArrayList(
+ getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true),
+ getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false)
+ );
+ String alterQuery = buildAlterTableQuery(tmpTableBase, dir, validWriteIdList);
+ if (!alterQuery.isEmpty()) {
+ queries.add(alterQuery);
+ }
return queries;
}
+ private String getCreateQuery(String resultTableName, Table t, StorageDescriptor sd,
+ String location, boolean isPartitioned) {
+ return new CompactionQueryBuilder(
+ CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY,
+ CompactionQueryBuilder.Operation.CREATE,
+ resultTableName)
+ .setSourceTab(t)
+ .setStorageDescriptor(sd)
+ .setLocation(location)
+ .setPartitioned(isPartitioned)
+ .build();
+ }
+
/**
* Builds an alter table query, which adds partitions pointing to location of delta directories.
*
* @param tableName name of the temp table to be altered
* @param dir the parent directory of delta directories
* @param validWriteIdList valid write ids for the table/partition to compact
- * @return alter table statement wrapped in {@link Optional}.
+ * @return alter table statement.
*/
- private Optional<String> buildAlterTableQuery(String tableName, AcidUtils.Directory dir,
+ private String buildAlterTableQuery(String tableName, AcidUtils.Directory dir,
ValidWriteIdList validWriteIdList) {
- if (!dir.getCurrentDirectories().isEmpty()) {
- long minWriteID =
- validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId();
- long highWatermark = validWriteIdList.getHighWatermark();
- List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories().stream().filter(
- delta -> delta.getMaxWriteId() <= highWatermark && delta.getMinWriteId() >= minWriteID)
- .collect(Collectors.toList());
- if (!deltas.isEmpty()) {
- StringBuilder query = new StringBuilder().append("alter table ").append(tableName);
- query.append(" add ");
- deltas.forEach(
- delta -> query.append("partition (file_name='").append(delta.getPath().getName())
- .append("') location '").append(delta.getPath()).append("' "));
- return Optional.of(query.toString());
- }
- }
- return Optional.empty();
+ return new CompactionQueryBuilder(CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY,
+ CompactionQueryBuilder.Operation.ALTER, tableName)
+ .setDir(dir)
+ .setValidWriteIdList(validWriteIdList)
+ .build();
}
/**
@@ -171,24 +171,21 @@ final class MmMinorQueryCompactor extends QueryCompactor {
* <li>insert into table $tmpTableBase_result select `col_1`, .. from tmpTableBase</li>
* </ol>
*
- * @param tmpTableBase an unique identifier, which helps to find all the temporary tables
+ * @param sourceTmpTableName an unique identifier, which helps to find all the temporary tables
+ * @param resultTmpTableName
* @return list of compaction queries, always non-null
*/
- private List<String> getCompactionQueries(String tmpTableBase, StorageDescriptor sd) {
- String resultTableName = tmpTableBase + "_result";
- StringBuilder query = new StringBuilder().append("insert into table ").append(resultTableName)
- .append(" select ");
- List<FieldSchema> cols = sd.getCols();
- boolean isFirst = true;
- for (FieldSchema col : cols) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append("`").append(col.getName()).append("`");
- }
- query.append(" from ").append(tmpTableBase);
- return Lists.newArrayList(query.toString());
+ private List<String> getCompactionQueries(String sourceTmpTableName, String resultTmpTableName,
+ Table sourceTable) {
+ return Lists.newArrayList(
+ new CompactionQueryBuilder(
+ CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY,
+ CompactionQueryBuilder.Operation.INSERT,
+ resultTmpTableName)
+ .setFromTableName(sourceTmpTableName)
+ .setSourceTab(sourceTable)
+ .build()
+ );
}
/**
@@ -197,8 +194,17 @@ final class MmMinorQueryCompactor extends QueryCompactor {
* @return list of drop table statements, always non-null
*/
private List<String> getDropQueries(String tmpTableBase) {
- return Lists.newArrayList(MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableBase,
- MmQueryCompactorUtils.DROP_IF_EXISTS + tmpTableBase + "_result");
+ return Lists.newArrayList(
+ getDropQuery(tmpTableBase),
+ getDropQuery(tmpTableBase + "_result")
+ );
+ }
+
+ private String getDropQuery(String tableToDrop) {
+ return new CompactionQueryBuilder(
+ CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY,
+ CompactionQueryBuilder.Operation.DROP,
+ tableToDrop).build();
}
private HiveConf setUpDriverSession(HiveConf hiveConf) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java
deleted file mode 100644
index 891696d..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmQueryCompactorUtils.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.txn.compactor;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.StatsSetupConst;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Order;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.SkewedInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.ql.ddl.table.create.show.ShowCreateTableOperation;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.util.DirectionUtils;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.HiveStringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-final class MmQueryCompactorUtils {
-
- private static final Logger LOG = LoggerFactory.getLogger(MmQueryCompactorUtils.class.getName());
- static final String DROP_IF_EXISTS = "drop table if exists ";
-
- private MmQueryCompactorUtils() {}
-
- /**
- * Creates a command to create a new table based on an example table (sourceTab).
- *
- * @param fullName of new table
- * @param sourceTab the table we are modeling the new table on
- * @param sd StorageDescriptor of the table or partition we are modeling the new table on
- * @param location of the new table
- * @param isPartitioned should the new table be partitioned
- * @param isExternal should the new table be external
- * @return query string creating the new table
- */
- static String getCreateQuery(String fullName, Table sourceTab, StorageDescriptor sd,
- String location, boolean isPartitioned, boolean isExternal) {
- StringBuilder query = new StringBuilder("create temporary ");
- if (isExternal) {
- query.append("external ");
- }
- query.append("table ").append(fullName).append("(");
- List<FieldSchema> cols = sourceTab.getSd().getCols();
- boolean isFirst = true;
- for (FieldSchema col : cols) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append("`").append(col.getName()).append("` ").append(col.getType());
- }
- query.append(") ");
-
- // Partitioning. Used for minor compaction.
- if (isPartitioned) {
- query.append(" PARTITIONED BY (`file_name` STRING) ");
- }
-
- // Bucketing.
- List<String> buckCols = sourceTab.getSd().getBucketCols();
- if (buckCols.size() > 0) {
- query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") ");
- List<Order> sortCols = sourceTab.getSd().getSortCols();
- if (sortCols.size() > 0) {
- query.append("SORTED BY (");
- isFirst = true;
- for (Order sortCol : sortCols) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append(sortCol.getCol()).append(" ").append(DirectionUtils.codeToText(sortCol.getOrder()));
- }
- query.append(") ");
- }
- query.append("INTO ").append(sourceTab.getSd().getNumBuckets()).append(" BUCKETS");
- }
-
- // Stored as directories. We don't care about the skew otherwise.
- if (sourceTab.getSd().isStoredAsSubDirectories()) {
- SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo();
- if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
- query.append(" SKEWED BY (").append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON ");
- isFirst = true;
- for (List<String> colValues : skewedInfo.getSkewedColValues()) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append("('").append(StringUtils.join("','", colValues)).append("')");
- }
- query.append(") STORED AS DIRECTORIES");
- }
- }
-
- SerDeInfo serdeInfo = sd.getSerdeInfo();
- Map<String, String> serdeParams = serdeInfo.getParameters();
- query.append(" ROW FORMAT SERDE '").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib()))
- .append("'");
- String sh = sourceTab.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
- assert sh == null; // Not supposed to be a compactable table.
- if (!serdeParams.isEmpty()) {
- ShowCreateTableOperation.appendSerdeParams(query, serdeParams);
- }
- query.append("STORED AS INPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getInputFormat()))
- .append("' OUTPUTFORMAT '").append(HiveStringUtils.escapeHiveCommand(sd.getOutputFormat()))
- .append("' LOCATION '").append(HiveStringUtils.escapeHiveCommand(location)).append("' TBLPROPERTIES (");
- // Exclude all standard table properties.
- Set<String> excludes = getHiveMetastoreConstants();
- excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
- isFirst = true;
- for (Map.Entry<String, String> e : sourceTab.getParameters().entrySet()) {
- if (e.getValue() == null) {
- continue;
- }
- if (excludes.contains(e.getKey())) {
- continue;
- }
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append("'").append(e.getKey()).append("'='").append(HiveStringUtils.escapeHiveCommand(e.getValue()))
- .append("'");
- }
- if (!isFirst) {
- query.append(", ");
- }
- query.append("'transactional'='false')");
- return query.toString();
-
- }
-
- private static Set<String> getHiveMetastoreConstants() {
- Set<String> result = new HashSet<>();
- for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
- if (!Modifier.isStatic(f.getModifiers())) {
- continue;
- }
- if (!Modifier.isFinal(f.getModifiers())) {
- continue;
- }
- if (!String.class.equals(f.getType())) {
- continue;
- }
- f.setAccessible(true);
- try {
- result.add((String) f.get(null));
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
- return result;
- }
-
- /**
- * Remove the delta directories of aborted transactions.
- */
- static void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException {
- List<Path> filesToDelete = dir.getAbortedDirectories();
- if (filesToDelete.size() < 1) {
- return;
- }
- LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
- FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
- for (Path dead : filesToDelete) {
- LOG.debug("Going to delete path " + dead.toString());
- fs.delete(dead, true);
- }
- }
-}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
index 7a9e48f..7f3ccfa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -23,12 +23,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -161,31 +161,6 @@ abstract class QueryCompactor {
}
}
- /**
- * Get a create temporary table query string with Orc ACID columns.
- * @param tableName name of the new temporary table
- * @param table the table where the compaction is running
- * @return create query
- */
- static String getCreateTempTableQueryWithAcidColumns(String tableName, Table table) {
- StringBuilder query = new StringBuilder("create temporary external table ").append(tableName).append(" (");
- // Acid virtual columns
- query.append("`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` "
- + "bigint, `row` struct<");
- List<FieldSchema> cols = table.getSd().getCols();
- boolean isFirst = true;
- // Actual columns
- for (FieldSchema col : cols) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append("`").append(col.getName()).append("` ").append(":").append(col.getType());
- }
- query.append(">)");
- return query.toString();
- }
-
/**
* Remove the root directory of a table if it's empty.
* @param conf the Hive configuration
@@ -203,5 +178,20 @@ abstract class QueryCompactor {
}
}
}
+ /**
+ * Remove the delta directories of aborted transactions.
+ */
+ static void removeFilesForMmTable(HiveConf conf, AcidUtils.Directory dir) throws IOException {
+ List<Path> filesToDelete = dir.getAbortedDirectories();
+ if (filesToDelete.size() < 1) {
+ return;
+ }
+ LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + dir);
+ FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
+ for (Path dead : filesToDelete) {
+ LOG.debug("Going to delete path " + dead.toString());
+ fs.delete(dead, true);
+ }
+ }
}
}