You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/05/21 07:17:38 UTC
[hive] branch master updated: HIVE-22971: Eliminate file rename in
insert-only compactor (Karen Coppage via Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 29db9e6 HIVE-22971: Eliminate file rename in insert-only compactor (Karen Coppage via Peter Vary)
29db9e6 is described below
commit 29db9e61c6164fe8d2e21c10e57fe00df0b6cea4
Author: Karen Coppage <ka...@cloudera.com>
AuthorDate: Thu May 21 09:17:03 2020 +0200
HIVE-22971: Eliminate file rename in insert-only compactor (Karen Coppage via Peter Vary)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +-
.../hive/ql/txn/compactor/TestCompactor.java | 131 +++++++++++++--------
.../ql/txn/compactor/TestCrudCompactorOnTez.java | 4 +-
.../hadoop/hive/ql/txn/compactor/CompactorMR.java | 13 +-
.../hive/ql/txn/compactor/MajorQueryCompactor.java | 16 +--
.../hive/ql/txn/compactor/MinorQueryCompactor.java | 38 +++---
.../ql/txn/compactor/MmMajorQueryCompactor.java | 37 ++----
.../ql/txn/compactor/MmMinorQueryCompactor.java | 60 +++-------
.../hive/ql/txn/compactor/QueryCompactor.java | 54 +++++++--
.../ql/txn/compactor/QueryCompactorFactory.java | 6 +-
10 files changed, 195 insertions(+), 169 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 5a39006..26eeeb7 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2665,8 +2665,9 @@ public class HiveConf extends Configuration {
"The default value is 1000000, since the data limit of a znode is 1MB"),
HIVE_MM_ALLOW_ORIGINALS("hive.mm.allow.originals", false,
"Whether to allow original files in MM tables. Conversion to MM may be expensive if\n" +
- "this is set to false, however unless MAPREDUCE-7086 fix is present, queries that\n" +
- "read MM tables with original files will fail. The default in Hive 3.0 is false."),
+ "this is set to false, however unless MAPREDUCE-7086 fix is present (hadoop 3.1.1+),\n" +
+ "queries that read non-orc MM tables with original files will fail. The default in\n" +
+ "Hive 3.0 is false."),
// Zookeeper related configs
HIVE_ZOOKEEPER_USE_KERBEROS("hive.zookeeper.kerberos.enabled", true,
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index b9db1d1..c687f14 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -818,96 +818,131 @@ public class TestCompactor {
verifyHasBase(table.getSd(), fs, "base_0000002_v0000006");
}
- @Test
- public void mmTableOriginalsOrc() throws Exception {
- mmTableOriginals("ORC");
+ @Test public void mmTableOriginalsMajorOrc() throws Exception {
+ mmTableOriginalsMajor("orc", true);
}
- @Test
- public void mmTableOriginalsText() throws Exception {
- mmTableOriginals("TEXTFILE");
+ @Test public void mmTableOriginalsMajorText() throws Exception {
+ mmTableOriginalsMajor("textfile", false);
}
- private void mmTableOriginals(String format) throws Exception {
- // Originals split won't work due to MAPREDUCE-7086 issue in FileInputFormat.
- boolean isBrokenUntilMapreduce7086 = "TEXTFILE".equals(format);
+ /**
+ * Major compact an mm table that contains original files.
+ */
+ private void mmTableOriginalsMajor(String format, boolean allowOriginals) throws Exception {
+ driver.getConf().setBoolVar(ConfVars.HIVE_MM_ALLOW_ORIGINALS, allowOriginals);
String dbName = "default";
String tblName = "mm_nonpart";
executeStatementOnDriver("drop table if exists " + tblName, driver);
- executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " +
- format + " TBLPROPERTIES ('transactional'='false')", driver);
- IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + format
+ + " TBLPROPERTIES ('transactional'='false')", driver);
Table table = msClient.getTable(dbName, tblName);
executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver);
- executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver);
- executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM "
- + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
+ executeStatementOnDriver("INSERT INTO " + tblName + " (a,b) VALUES(2, 'bar')", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName
+ + " UNION ALL SELECT a,b FROM " + tblName, driver);
verifyFooBarResult(tblName, 3);
FileSystem fs = FileSystem.get(conf);
executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES "
- + "('transactional'='true', 'transactional_properties'='insert_only')", driver);
+ + "('transactional'='true', 'transactional_properties'='insert_only')", driver);
- verifyFooBarResult(tblName, 3);
+ if (allowOriginals) {
+ verifyDeltaCount(table.getSd(), fs, 0);
+ verifyFooBarResult(tblName, 3);
- runMajorCompaction(dbName, tblName);
- verifyFooBarResult(tblName, 3);
- verifyHasBase(table.getSd(), fs, "base_0000001_v0000009");
+ runMajorCompaction(dbName, tblName);
+ verifyFooBarResult(tblName, 3);
+ verifyHasBase(table.getSd(), fs, "base_0000001_v0000009");
+ } else {
+ verifyDeltaCount(table.getSd(), fs, 1);
+ // 1 delta dir won't be compacted. Skip testing major compaction.
+ }
+ }
+
+ @Test public void mmMajorOriginalsDeltasOrc() throws Exception {
+ mmMajorOriginalsDeltas("orc", true);
+ }
+
+ @Test public void mmMajorOriginalsDeltasText() throws Exception {
+ mmMajorOriginalsDeltas("textfile", false);
+ }
- // Try with an extra delta.
+ /**
+ * Major compact an mm table with both original and delta files.
+ */
+ private void mmMajorOriginalsDeltas(String format, boolean allowOriginals) throws Exception {
+ driver.getConf().setBoolVar(ConfVars.HIVE_MM_ALLOW_ORIGINALS, allowOriginals);
+ String dbName = "default";
+ String tblName = "mm_nonpart";
+ FileSystem fs = FileSystem.get(conf);
executeStatementOnDriver("drop table if exists " + tblName, driver);
- executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " +
- format + " TBLPROPERTIES ('transactional'='false')", driver);
- table = msClient.getTable(dbName, tblName);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + format
+ + " TBLPROPERTIES ('transactional'='false')", driver);
+ Table table = msClient.getTable(dbName, tblName);
executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver);
- executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver);
- executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM "
- + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
+ executeStatementOnDriver("INSERT INTO " + tblName + " (a,b) VALUES(2, 'bar')", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName
+ + " UNION ALL SELECT a,b FROM " + tblName, driver);
verifyFooBarResult(tblName, 3);
executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES "
- + "('transactional'='true', 'transactional_properties'='insert_only')", driver);
- executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM "
- + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
-
- // Neither select nor compaction (which is a select) wil work after this.
- if (isBrokenUntilMapreduce7086) return;
+ + "('transactional'='true', 'transactional_properties'='insert_only')", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName
+ + " UNION ALL SELECT a,b FROM " + tblName, driver);
verifyFooBarResult(tblName, 9);
runMajorCompaction(dbName, tblName);
verifyFooBarResult(tblName, 9);
- verifyHasBase(table.getSd(), fs, "base_0000002_v0000023");
+ verifyHasBase(table.getSd(), fs, "base_0000002_v0000010");
+ }
+ @Test public void mmMajorOriginalsBaseOrc() throws Exception {
+ mmMajorOriginalsBase("orc", true);
+ }
+
+ @Test public void mmMajorOriginalsBaseText() throws Exception {
+ mmMajorOriginalsBase("textfile", false);
+ }
+
+ /**
+ * Insert overwrite and major compact an mm table with only original files.
+ *
+ * @param format file format for table
+ * @throws Exception
+ */
+ private void mmMajorOriginalsBase(String format, boolean allowOriginals) throws Exception {
+ driver.getConf().setBoolVar(ConfVars.HIVE_MM_ALLOW_ORIGINALS, allowOriginals);
// Try with an extra base.
+ String dbName = "default";
+ String tblName = "mm_nonpart";
+ FileSystem fs = FileSystem.get(conf);
executeStatementOnDriver("drop table if exists " + tblName, driver);
- executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " +
- format + " TBLPROPERTIES ('transactional'='false')", driver);
- table = msClient.getTable(dbName, tblName);
+ executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) STORED AS " + format
+ + " TBLPROPERTIES ('transactional'='false')", driver);
+ Table table = msClient.getTable(dbName, tblName);
executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver);
- executeStatementOnDriver("INSERT INTO " + tblName +" (a,b) VALUES(2, 'bar')", driver);
- executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM "
- + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
+ executeStatementOnDriver("INSERT INTO " + tblName + " (a,b) VALUES(2, 'bar')", driver);
+ executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) SELECT a,b FROM " + tblName
+ + " UNION ALL SELECT a,b FROM " + tblName, driver);
verifyFooBarResult(tblName, 3);
executeStatementOnDriver("ALTER TABLE " + tblName + " SET TBLPROPERTIES "
- + "('transactional'='true', 'transactional_properties'='insert_only')", driver);
- executeStatementOnDriver("INSERT OVERWRITE TABLE " + tblName + " SELECT a,b FROM "
- + tblName + " UNION ALL SELECT a,b FROM " + tblName, driver);
+ + "('transactional'='true', 'transactional_properties'='insert_only')", driver);
+ executeStatementOnDriver("INSERT OVERWRITE TABLE " + tblName + " SELECT a,b FROM " + tblName
+ + " UNION ALL SELECT a,b FROM " + tblName, driver);
verifyFooBarResult(tblName, 6);
runMajorCompaction(dbName, tblName);
verifyFooBarResult(tblName, 6);
verifyHasBase(table.getSd(), fs, "base_0000002");
-
- msClient.close();
}
-
@Test
public void mmTableBucketed() throws Exception {
String dbName = "default";
@@ -1059,8 +1094,8 @@ public class TestCompactor {
}
- private void verifyFooBarResult(String tblName, int count) throws Exception, IOException {
- List<String> valuesReadFromHiveDriver = new ArrayList<String>();
+ private void verifyFooBarResult(String tblName, int count) throws Exception {
+ List<String> valuesReadFromHiveDriver = new ArrayList<>();
executeStatementOnDriver("SELECT a,b FROM " + tblName, driver);
driver.getResults(valuesReadFromHiveDriver);
Assert.assertEquals(2 * count, valuesReadFromHiveDriver.size());
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
index 89920cc..7bbc4bc 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java
@@ -1008,12 +1008,12 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest {
hiveConf.set(ValidTxnList.VALID_TXNS_KEY, "8:9223372036854775807::");
// Check for default case.
- qc.runCompactionQueries(hiveConf, null, sdMock, null, null, emptyQueries, emptyQueries, emptyQueries);
+ qc.runCompactionQueries(hiveConf, null, sdMock, null, null, null, emptyQueries, emptyQueries, emptyQueries);
Assert.assertEquals("all", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT));
// Check for case where hive.llap.io.etl.skip.format is explicitly set to none - as to always use cache.
hiveConf.setVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT, "none");
- qc.runCompactionQueries(hiveConf, null, sdMock, null, null, emptyQueries, emptyQueries, emptyQueries);
+ qc.runCompactionQueries(hiveConf, null, sdMock, null, null, null, emptyQueries, emptyQueries, emptyQueries);
Assert.assertEquals("none", hiveConf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT));
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 05ea38c..018c733 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.regex.Matcher;
import org.apache.hadoop.conf.Configuration;
@@ -133,7 +134,7 @@ public class CompactorMR {
job.setOutputCommitter(CompactorOutputCommitter.class);
job.set(FINAL_LOCATION, sd.getLocation());
- job.set(TMP_LOCATION, QueryCompactor.Util.generateTmpPath(sd));
+ job.set(TMP_LOCATION, generateTmpPath(sd));
job.set(INPUT_FORMAT_CLASS_NAME, sd.getInputFormat());
job.set(OUTPUT_FORMAT_CLASS_NAME, sd.getOutputFormat());
job.setBoolean(IS_COMPRESSED, sd.isCompressed());
@@ -230,6 +231,7 @@ public class CompactorMR {
*/
QueryCompactor queryCompactor = QueryCompactorFactory.getQueryCompactor(t, conf, ci);
if (queryCompactor != null) {
+ LOG.info("Will compact with " + queryCompactor.getClass().getName());
queryCompactor.runCompaction(conf, t, p, sd, writeIds, ci);
return;
}
@@ -296,6 +298,15 @@ public class CompactorMR {
}
/**
+ * Generate a random tmp path, under the provided storage.
+ * @param sd storage descriptor, must be not null.
+ * @return path, always not null
+ */
+ private static String generateTmpPath(StorageDescriptor sd) {
+ return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
+ }
+
+ /**
* @param baseDir if not null, it's either table/partition root folder or base_xxxx.
* If it's base_xxxx, it's in dirsToSearch, else the actual original files
* (all leaves recursively) are in the dirsToSearch list
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index c70d4f3..9558409 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -25,9 +25,9 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+
import java.io.IOException;
import java.util.List;
@@ -53,20 +53,14 @@ final class MajorQueryCompactor extends QueryCompactor {
String tmpPrefix = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_";
String tmpTableName = tmpPrefix + System.currentTimeMillis();
-
- long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId();
- long highWaterMark = writeIds.getHighWatermark();
- long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf);
- AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true)
- .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId)
- .maximumWriteId(highWaterMark).statementId(-1).visibilityTxnId(compactorTxnId);
- Path tmpTablePath = AcidUtils.baseOrDeltaSubdirPath(new Path(storageDescriptor.getLocation()), options);
+ Path tmpTablePath = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds,
+ conf, true, false, false);
List<String> createQueries = getCreateQueries(tmpTableName, table, tmpTablePath.toString());
List<String> compactionQueries = getCompactionQueries(table, partition, tmpTableName);
List<String> dropQueries = getDropQueries(tmpTableName);
- runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries,
- compactionQueries, dropQueries);
+ runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo,
+ Lists.newArrayList(tmpTablePath), createQueries, compactionQueries, dropQueries);
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
index 4d0e5f7..d83a50f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hive.common.util.Ref;
@@ -45,7 +44,7 @@ final class MinorQueryCompactor extends QueryCompactor {
@Override
void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor,
- ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException, HiveException {
+ ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException {
LOG.info("Running query based minor compaction");
AcidUtils
.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters()));
@@ -61,14 +60,22 @@ final class MinorQueryCompactor extends QueryCompactor {
String tmpTableName =
table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + System.currentTimeMillis();
- List<String> createQueries = getCreateQueries(table, tmpTableName, dir, writeIds, conf, storageDescriptor);
+ Path resultDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor,
+ writeIds, conf, false, false, false);
+ Path resultDeleteDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor,
+ writeIds, conf, false, true, false);
+
+ List<String> createQueries = getCreateQueries(table, tmpTableName, dir, writeIds,
+ resultDeltaDir, resultDeleteDeltaDir);
List<String> compactionQueries = getCompactionQueries(tmpTableName, table, writeIds);
List<String> dropQueries = getDropQueries(tmpTableName);
- runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries,
+ runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo,
+ Lists.newArrayList(resultDeltaDir, resultDeleteDeltaDir), createQueries,
compactionQueries, dropQueries);
}
+
@Override
protected void commitCompaction(String dest, String tmpTableName, HiveConf conf,
ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
@@ -89,16 +96,13 @@ final class MinorQueryCompactor extends QueryCompactor {
* @param tempTableBase an unique identifier which is used to create delta/delete-delta temp tables
* @param dir the directory, where the delta directories resides
* @param writeIds list of valid write ids, used to filter out delta directories which are not relevant for compaction
- * @param conf hive configuration
- * @param storageDescriptor this is the resolved storage descriptor
+ * @param tmpTableResultLocation result delta dir
+ * @param tmpTableDeleteResultLocation result delete delta dir
* @return list of create/alter queries, always non-null
*/
private List<String> getCreateQueries(Table table, String tempTableBase, AcidUtils.Directory dir,
- ValidWriteIdList writeIds, HiveConf conf, StorageDescriptor storageDescriptor) {
+ ValidWriteIdList writeIds, Path tmpTableResultLocation, Path tmpTableDeleteResultLocation) {
List<String> queries = new ArrayList<>();
- long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId();
- long highWatermark = writeIds.getHighWatermark();
- long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf);
// create delta temp table
String tmpTableName = AcidUtils.DELTA_PREFIX + tempTableBase;
queries.add(buildCreateTableQuery(table, tmpTableName, true, false, null));
@@ -106,15 +110,9 @@ final class MinorQueryCompactor extends QueryCompactor {
if (!alterQuery.isEmpty()) {
queries.add(alterQuery);
}
- AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(false)
- .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId)
- .maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId);
- Path location = new Path(storageDescriptor.getLocation());
- String tmpTableResultLocation = AcidUtils.baseOrDeltaSubdirPath(location,
- options).toString();
// create delta result temp table
queries.add(buildCreateTableQuery(table, tmpTableName + "_result", false, true,
- tmpTableResultLocation));
+ tmpTableResultLocation.toString()));
// create delete delta temp tables
String tmpDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tempTableBase;
@@ -124,13 +122,9 @@ final class MinorQueryCompactor extends QueryCompactor {
if (!alterQuery.isEmpty()) {
queries.add(alterQuery);
}
- options = new AcidOutputFormat.Options(conf).writingBase(false).writingDeleteDelta(true).isCompressed(false)
- .minimumWriteId(minOpenWriteId).maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId);
- String tmpTableDeleteResultLocation = AcidUtils.baseOrDeltaSubdirPath(location,
- options).toString();
// create delete delta result temp table
queries.add(buildCreateTableQuery(table, tmpDeleteTableName + "_result", false, true,
- tmpTableDeleteResultLocation));
+ tmpTableDeleteResultLocation.toString()));
return queries;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index 724a437..5e11d8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.txn.compactor;
import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -26,9 +25,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
@@ -53,9 +50,6 @@ final class MmMajorQueryCompactor extends QueryCompactor {
table.getParameters(), false);
QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir);
- String tmpLocation = Util.generateTmpPath(storageDescriptor);
- Path baseLocation = new Path(tmpLocation, "_base");
-
// Set up the session for driver.
HiveConf driverConf = new HiveConf(hiveConf);
driverConf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
@@ -64,13 +58,15 @@ final class MmMajorQueryCompactor extends QueryCompactor {
// "insert overwrite directory" command if there were no bucketing or list bucketing.
String tmpPrefix = table.getDbName() + ".tmp_compactor_" + table.getTableName() + "_";
String tmpTableName = tmpPrefix + System.currentTimeMillis();
- List<String> createTableQueries =
- getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(),
- baseLocation.toString());
+ Path resultBaseDir = QueryCompactor.Util.getCompactionResultDir(
+ storageDescriptor, writeIds, driverConf, true, true, false);
+
+ List<String> createTableQueries = getCreateQueries(tmpTableName, table, storageDescriptor,
+ resultBaseDir.toString());
List<String> compactionQueries = getCompactionQueries(table, partition, tmpTableName);
List<String> dropQueries = getDropQueries(tmpTableName);
runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo,
- createTableQueries, compactionQueries, dropQueries);
+ Lists.newArrayList(resultBaseDir), createTableQueries, compactionQueries, dropQueries);
}
/**
@@ -81,26 +77,7 @@ final class MmMajorQueryCompactor extends QueryCompactor {
@Override
protected void commitCompaction(String dest, String tmpTableName, HiveConf conf,
ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
- org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName);
- String from = tempTable.getSd().getLocation();
- Path fromPath = new Path(from), toPath = new Path(dest);
- FileSystem fs = fromPath.getFileSystem(conf);
- // Assume the high watermark can be used as maximum transaction ID.
- //todo: is that true? can it be aborted? does it matter for compaction? probably OK since
- //getAcidState() doesn't check if X is valid in base_X_vY for compacted base dirs.
- long maxTxn = actualWriteIds.getHighWatermark();
- AcidOutputFormat.Options options =
- new AcidOutputFormat.Options(conf).writingBase(true).isCompressed(false).maximumWriteId(maxTxn).bucket(0)
- .statementId(-1).visibilityTxnId(compactorTxnId);
- Path newBaseDir = AcidUtils.createFilename(toPath, options).getParent();
- if (!fs.exists(fromPath)) {
- LOG.info(from + " not found. Assuming 0 splits. Creating " + newBaseDir);
- fs.mkdirs(newBaseDir);
- return;
- }
- LOG.info("Moving contents of " + from + " to " + dest);
- fs.rename(fromPath, newBaseDir);
- fs.delete(fromPath, true);
+ Util.cleanupEmptyDir(conf, tmpTableName);
}
private List<String> getCreateQueries(String tmpTableName, Table table,
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
index 1cd95f80..1bdec7d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.txn.compactor;
import com.google.common.collect.Lists;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -26,9 +25,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
@@ -51,27 +48,25 @@ final class MmMinorQueryCompactor extends QueryCompactor {
"Going to delete directories for aborted transactions for MM table " + table.getDbName()
+ "." + table.getTableName());
- AcidUtils.Directory dir = AcidUtils
- .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds,
- Ref.from(false), false, table.getParameters(), false);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(null,
+ new Path(storageDescriptor.getLocation()), hiveConf, writeIds,
+ Ref.from(false), false, table.getParameters(), false);
QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir);
- String tmpLocation = Util.generateTmpPath(storageDescriptor);
- Path sourceTabLocation = new Path(tmpLocation);
- Path resultTabLocation = new Path(tmpLocation, "_result");
HiveConf driverConf = setUpDriverSession(hiveConf);
String tmpPrefix = table.getDbName() + ".tmp_minor_compactor_" + table.getTableName() + "_";
String tmpTableName = tmpPrefix + System.currentTimeMillis();
String resultTmpTableName = tmpTableName + "_result";
+ Path resultDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds, driverConf,
+ false, false, false);
- List<String> createTableQueries =
- getCreateQueries(tmpTableName, table, partition == null ? table.getSd() : partition.getSd(),
- sourceTabLocation.toString(), resultTabLocation.toString(), dir, writeIds);
+ List<String> createTableQueries = getCreateQueries(tmpTableName, table, storageDescriptor, dir,
+ writeIds, resultDeltaDir);
List<String> compactionQueries = getCompactionQueries(tmpTableName, resultTmpTableName, table);
List<String> dropQueries = getDropQueries(tmpTableName);
runCompactionQueries(driverConf, tmpTableName, storageDescriptor, writeIds, compactionInfo,
- createTableQueries, compactionQueries, dropQueries);
+ Lists.newArrayList(resultDeltaDir), createTableQueries, compactionQueries, dropQueries);
}
/**
@@ -79,26 +74,7 @@ final class MmMinorQueryCompactor extends QueryCompactor {
*/
@Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf,
ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException {
- org.apache.hadoop.hive.ql.metadata.Table resultTable =
- Hive.get().getTable(tmpTableName + "_result");
- String from = resultTable.getSd().getLocation();
- Path fromPath = new Path(from);
- Path toPath = new Path(dest);
- FileSystem fs = fromPath.getFileSystem(conf);
- long maxTxn = actualWriteIds.getHighWatermark();
- AcidOutputFormat.Options options =
- new AcidOutputFormat.Options(conf).writingBase(false).isCompressed(false)
- .minimumWriteId(1).maximumWriteId(maxTxn).bucket(0).statementId(-1)
- .visibilityTxnId(compactorTxnId);
- Path newDeltaDir = AcidUtils.createFilename(toPath, options).getParent();
- if (!fs.exists(fromPath)) {
- LOG.info(from + " not found. Assuming 0 splits. Creating " + newDeltaDir);
- fs.mkdirs(newDeltaDir);
- return;
- }
- LOG.info("Moving contents of " + from + " to " + dest);
- fs.rename(fromPath, newDeltaDir);
- fs.delete(fromPath, true);
+ Util.cleanupEmptyDir(conf, tmpTableName);
}
/**
@@ -114,32 +90,30 @@ final class MmMinorQueryCompactor extends QueryCompactor {
* @param tmpTableBase name of the first temp table (second will be $tmpTableBase_result)
* @param t Table to compact
* @param sd storage descriptor of table or partition to compact
- * @param sourceTabLocation location the "source table" (temp table 1) should go
- * @param resultTabLocation location the "result table (temp table 2) should go
* @param dir the parent directory of delta directories
- * @param validWriteIdList valid write ids for the table/partition to compact
+ * @param writeIds ValidWriteIdList for the table/partition we are compacting
+ * @param resultDeltaDir the final location for the
* @return List of 3 query strings: 2 create table, 1 alter table
*/
private List<String> getCreateQueries(String tmpTableBase, Table t, StorageDescriptor sd,
- String sourceTabLocation, String resultTabLocation, AcidUtils.Directory dir,
- ValidWriteIdList validWriteIdList) {
+ AcidUtils.Directory dir, ValidWriteIdList writeIds, Path resultDeltaDir) {
List<String> queries = Lists.newArrayList(
- getCreateQuery(tmpTableBase, t, sd, sourceTabLocation, true),
- getCreateQuery(tmpTableBase + "_result", t, sd, resultTabLocation, false)
+ getCreateQuery(tmpTableBase, t, sd, null, true),
+ getCreateQuery(tmpTableBase + "_result", t, sd, resultDeltaDir.toString(), false)
);
- String alterQuery = buildAlterTableQuery(tmpTableBase, dir, validWriteIdList);
+ String alterQuery = buildAlterTableQuery(tmpTableBase, dir, writeIds);
if (!alterQuery.isEmpty()) {
queries.add(alterQuery);
}
return queries;
}
- private String getCreateQuery(String resultTableName, Table t, StorageDescriptor sd,
+ private String getCreateQuery(String newTableName, Table t, StorageDescriptor sd,
String location, boolean isPartitioned) {
return new CompactionQueryBuilder(
CompactionQueryBuilder.CompactionType.MINOR_INSERT_ONLY,
CompactionQueryBuilder.Operation.CREATE,
- resultTableName)
+ newTableName)
.setSourceTab(t)
.setStorageDescriptor(sd)
.setLocation(location)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
index 7f3ccfa..1f732f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.DriverUtils;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -38,7 +39,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
-import java.util.UUID;
/**
* Common interface for query based compactions.
@@ -82,6 +82,7 @@ abstract class QueryCompactor {
* @param storageDescriptor this is the resolved storage descriptor.
* @param writeIds valid write IDs used to filter rows while they're being read for compaction.
* @param compactionInfo provides info about the type of compaction.
+ * @param resultDirs the delta/base directory that is created as the result of compaction.
* @param createQueries collection of queries which creates the temporary tables.
* @param compactionQueries collection of queries which uses data from the original table and writes in temporary
* tables.
@@ -89,8 +90,9 @@ abstract class QueryCompactor {
* @throws IOException error during the run of the compaction.
*/
void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor storageDescriptor,
- ValidWriteIdList writeIds, CompactionInfo compactionInfo, List<String> createQueries,
- List<String> compactionQueries, List<String> dropQueries) throws IOException {
+ ValidWriteIdList writeIds, CompactionInfo compactionInfo, List<Path> resultDirs,
+ List<String> createQueries, List<String> compactionQueries, List<String> dropQueries)
+ throws IOException {
Util.disableLlapCaching(conf);
String user = UserGroupInformation.getCurrentUser().getShortUserName();
SessionState sessionState = DriverUtils.setUpSessionState(conf, user, true);
@@ -118,6 +120,7 @@ abstract class QueryCompactor {
commitCompaction(storageDescriptor.getLocation(), tmpTableName, conf, writeIds, compactorTxnId);
} catch (HiveException e) {
LOG.error("Error doing query based {} compaction", compactionInfo.isMajorCompaction() ? "major" : "minor", e);
+ removeResultDirs(resultDirs, conf);
throw new IOException(e);
} finally {
try {
@@ -135,16 +138,51 @@ abstract class QueryCompactor {
}
/**
+ * Call in case compaction failed. Removes the new empty compacted delta/base.
+ * Cleaner would handle this later but clean up now just in case.
+ */
+ private void removeResultDirs(List<Path> resultDirPaths, HiveConf conf) throws IOException {
+ for (Path path : resultDirPaths) {
+ LOG.info("Compaction failed, removing directory: " + path.toString());
+ FileSystem fs = path.getFileSystem(conf);
+ if (!fs.listFiles(path, false).hasNext()) {
+ fs.delete(path, true);
+ }
+ }
+ }
+
+ /**
* Collection of some helper functions.
*/
static class Util {
+
/**
- * Generate a random tmp path, under the provided storage.
- * @param sd storage descriptor, must be not null.
- * @return path, always not null
+ * Get the path of the base, delta, or delete delta directory that will be the final
+ * destination of the files during compaction.
+ *
+ * @param sd storage descriptor of table or partition to compact
+ * @param writeIds list of valid writeids
+ * @param conf HiveConf
+ * @param writingBase if true, we are creating a base directory, otherwise a delta
+ * @param createDeleteDelta if true, the delta dir we are creating is a delete delta
+ * @param bucket0 whether to specify 0 as the bucketid
+ *
+ * @return Path of new base/delta/delete delta directory
*/
- static String generateTmpPath(StorageDescriptor sd) {
- return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
+ static Path getCompactionResultDir(StorageDescriptor sd, ValidWriteIdList writeIds, HiveConf conf,
+ boolean writingBase, boolean createDeleteDelta, boolean bucket0) {
+ long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId();
+ long highWatermark = writeIds.getHighWatermark();
+ long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf);
+ AcidOutputFormat.Options options =
+ new AcidOutputFormat.Options(conf).isCompressed(false).minimumWriteId(minOpenWriteId)
+ .maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId)
+ .writingBase(writingBase).writingDeleteDelta(createDeleteDelta);
+ if (bucket0) {
+ options = options.bucket(0);
+ }
+ Path location = new Path(sd.getLocation());
+ return AcidUtils.baseOrDeltaSubdirPath(location, options);
}
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
index 6542eef..93dd85b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactorFactory.java
@@ -40,7 +40,9 @@ final class QueryCompactorFactory {
* <br>
* {@link MinorQueryCompactor} - handles query based minor compaction
* <br>
- * {@link MmMajorQueryCompactor} - handles query based minor compaction for micro-managed tables
+ * {@link MmMajorQueryCompactor} - handles query based major compaction for micro-managed tables
+ * <br>
+ * {@link MmMinorQueryCompactor} - handles query based minor compaction for micro-managed tables
* <br>
* </p>
* @param table the table, on which the compaction should be running, must be not null.
@@ -55,7 +57,7 @@ final class QueryCompactorFactory {
return new MajorQueryCompactor();
} else if (!compactionInfo.isMajorCompaction() && "tez"
.equalsIgnoreCase(HiveConf.getVar(configuration, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE))) {
- // query based minor compaction is only supported on tez
+ // query based minor compactigenerateAddMmTaskson is only supported on tez
return new MinorQueryCompactor();
}
}