You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/24 18:06:57 UTC
hive git commit: HIVE-14881 : integrate MM tables into ACID: merge
cleaner into ACID threads (Wei Zheng)
Repository: hive
Updated Branches:
refs/heads/hive-14535 ed29d1b80 -> 93aec1217
HIVE-14881 : integrate MM tables into ACID: merge cleaner into ACID threads (Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/93aec121
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/93aec121
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/93aec121
Branch: refs/heads/hive-14535
Commit: 93aec1217b29d9026d258771bf6bbcd0f2ae8f61
Parents: ed29d1b
Author: Wei Zheng <we...@apache.org>
Authored: Wed May 24 11:05:01 2017 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Wed May 24 11:05:01 2017 -0700
----------------------------------------------------------------------
.../hadoop/hive/metastore/txn/TxnUtils.java | 2 +-
.../org/apache/hadoop/hive/ql/exec/DDLTask.java | 2 +-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 32 ++++++---
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 2 +-
.../hive/ql/txn/compactor/CompactorMR.java | 36 ++++++++++
.../hadoop/hive/ql/txn/compactor/Initiator.java | 6 ++
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 70 ++++++++++++++++++--
7 files changed, 134 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 6e0070b..b2ced6b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -99,7 +99,7 @@ public class TxnUtils {
}
highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
BitSet bitSet = new BitSet(exceptions.length);
- bitSet.set(0, bitSet.length()); // for ValidCompactorTxnList, everything in exceptions are aborted
+ bitSet.set(0, exceptions.length); // for ValidCompactorTxnList, everything in exceptions are aborted
return new ValidCompactorTxnList(exceptions, bitSet, highWater);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 733595c..f67831e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -1860,7 +1860,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
private int compact(Hive db, AlterTableSimpleDesc desc) throws HiveException {
Table tbl = db.getTable(desc.getTableName());
- if (!AcidUtils.isFullAcidTable(tbl)) {
+ if (!AcidUtils.isFullAcidTable(tbl) && !MetaStoreUtils.isInsertOnlyTable(tbl.getParameters())) {
throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, tbl.getDbName(),
tbl.getTableName());
}
http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 902caa3..e723e2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -534,6 +534,12 @@ public class AcidUtils {
* more up to date ones. Not {@code null}.
*/
List<FileStatus> getObsolete();
+
+ /**
+ * Get the list of directories that has nothing but aborted transactions.
+ * @return the list of aborted directories
+ */
+ List<FileStatus> getAbortedDirectories();
}
public static class ParsedDelta implements Comparable<ParsedDelta> {
@@ -795,21 +801,22 @@ public class AcidUtils {
boolean useFileIds,
boolean ignoreEmptyFiles
) throws IOException {
- return getAcidState(directory, conf, txnList, Ref.from(useFileIds), ignoreEmptyFiles);
+ return getAcidState(directory, conf, txnList, Ref.from(useFileIds), ignoreEmptyFiles, null);
}
public static Directory getAcidState(Path directory,
Configuration conf,
ValidTxnList txnList,
Ref<Boolean> useFileIds,
- boolean ignoreEmptyFiles
- ) throws IOException {
+ boolean ignoreEmptyFiles,
+ Map<String, String> tblproperties) throws IOException {
FileSystem fs = directory.getFileSystem(conf);
// The following 'deltas' includes all kinds of delta files including insert & delete deltas.
final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
List<ParsedDelta> working = new ArrayList<ParsedDelta>();
List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
final List<FileStatus> obsolete = new ArrayList<FileStatus>();
+ final List<FileStatus> abortedDirectories = new ArrayList<>();
List<HdfsFileStatusWithId> childrenWithId = null;
Boolean val = useFileIds.value;
if (val == null || val) {
@@ -829,14 +836,14 @@ public class AcidUtils {
final List<HdfsFileStatusWithId> original = new ArrayList<>();
if (childrenWithId != null) {
for (HdfsFileStatusWithId child : childrenWithId) {
- getChildState(child.getFileStatus(), child, txnList, working,
- originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles);
+ getChildState(child.getFileStatus(), child, txnList, working, originalDirectories, original,
+ obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties);
}
} else {
List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter);
for (FileStatus child : children) {
- getChildState(
- child, null, txnList, working, originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles);
+ getChildState(child, null, txnList, working, originalDirectories, original, obsolete,
+ bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties);
}
}
@@ -946,6 +953,11 @@ public class AcidUtils {
public List<FileStatus> getObsolete() {
return obsolete;
}
+
+ @Override
+ public List<FileStatus> getAbortedDirectories() {
+ return abortedDirectories;
+ }
};
}
/**
@@ -966,7 +978,7 @@ public class AcidUtils {
private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase,
- boolean ignoreEmptyFiles) throws IOException {
+ boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties) throws IOException {
Path p = child.getPath();
String fn = p.getName();
if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
@@ -995,6 +1007,10 @@ public class AcidUtils {
String deltaPrefix =
(fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
ParsedDelta delta = parseDelta(child, deltaPrefix);
+ if (tblproperties != null && MetaStoreUtils.isInsertOnlyTable(tblproperties) &&
+ ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeAborted(delta.minTransaction, delta.maxTransaction)) {
+ aborted.add(child);
+ }
if (txnList.isTxnRangeValid(delta.minTransaction,
delta.maxTransaction) !=
ValidTxnList.RangeResponse.NONE) {
http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 8fb7211..db8c23f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -1094,7 +1094,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private AcidDirInfo callInternal() throws IOException {
AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf,
- context.transactionList, useFileIds, true);
+ context.transactionList, useFileIds, true, null);
Path base = dirInfo.getBaseDirectory();
// find the base files (original or new style)
List<AcidBaseFileInfo> baseFiles = new ArrayList<AcidBaseFileInfo>();
http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index f83b6db..c0b39ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.common.StringableMap;
import org.apache.hadoop.hive.common.ValidCompactorTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -70,6 +71,7 @@ import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -204,6 +206,16 @@ public class CompactorMR {
if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
}
+
+ // For MM tables we don't need to launch MR jobs as there is no compaction needed.
+ // We just need to delete the directories for aborted transactions.
+ if (MetaStoreUtils.isInsertOnlyTable(t.getParameters())) {
+ LOG.debug("Going to delete directories for aborted transactions for MM table "
+ + t.getDbName() + "." + t.getTableName());
+ removeFiles(conf, sd.getLocation(), txns, t);
+ return;
+ }
+
JobConf job = createBaseJobConf(conf, jobName, t, sd, txns, ci);
// Figure out and encode what files we need to read. We do this here (rather than in
@@ -344,6 +356,30 @@ public class CompactorMR {
HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
}
+ // Remove the directories for aborted transactions only
+ private void removeFiles(HiveConf conf, String location, ValidTxnList txnList, Table t)
+ throws IOException {
+ AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, txnList,
+ Ref.from(false), false, t.getParameters());
+ // For MM table, we only want to delete delta dirs for aborted txns.
+ List<FileStatus> abortedDirs = dir.getAbortedDirectories();
+ List<Path> filesToDelete = new ArrayList<>(abortedDirs.size());
+ for (FileStatus stat : abortedDirs) {
+ filesToDelete.add(stat.getPath());
+ }
+ if (filesToDelete.size() < 1) {
+ LOG.warn("Hmm, nothing to delete in the worker for directory " + location +
+ ", that hardly seems right.");
+ return;
+ }
+ LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + location);
+ FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
+ for (Path dead : filesToDelete) {
+ LOG.debug("Going to delete path " + dead.toString());
+ fs.delete(dead, true);
+ }
+ }
+
public JobConf getMrJob() {
return mrJob;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index af4a1da..c52bd3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -251,6 +252,11 @@ public class Initiator extends CompactorThread {
private CompactionType determineCompactionType(CompactionInfo ci, ValidTxnList txns,
StorageDescriptor sd, Map<String, String> tblproperties)
throws IOException, InterruptedException {
+
+ if (MetaStoreUtils.isInsertOnlyTable(tblproperties)) {
+ return CompactionType.MINOR;
+ }
+
boolean noBase = false;
Path location = new Path(sd.getLocation());
FileSystem fs = location.getFileSystem(conf);
http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 5786c4f..7a73a17 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -91,7 +91,8 @@ public class TestTxnCommands2 {
NONACIDORCTBL("nonAcidOrcTbl"),
NONACIDPART("nonAcidPart"),
NONACIDPART2("nonAcidPart2"),
- ACIDNESTEDPART("acidNestedPart");
+ ACIDNESTEDPART("acidNestedPart"),
+ MMTBL("mmTbl");
private final String name;
@Override
@@ -143,6 +144,7 @@ public class TestTxnCommands2 {
runStatementOnDriver("create table " + Table.ACIDNESTEDPART +
"(a int, b int) partitioned by (p int, q int) clustered by (a) into " + BUCKET_COUNT +
" buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
+ runStatementOnDriver("create table " + Table.MMTBL + "(a int, b int) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
}
protected void dropTables() throws Exception {
@@ -662,11 +664,11 @@ public class TestTxnCommands2 {
FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
Arrays.sort(buckets);
if (numDelta == 1) {
- Assert.assertEquals("delta_0000022_0000022_0000", status[i].getPath().getName());
+ Assert.assertEquals("delta_0000024_0000024_0000", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numDelta == 2) {
- Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName());
+ Assert.assertEquals("delta_0000025_0000025_0000", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT, buckets.length);
Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
@@ -711,7 +713,7 @@ public class TestTxnCommands2 {
Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
} else if (numBase == 2) {
// The new base dir now has two bucket files, since the delta dir has two bucket files
- Assert.assertEquals("base_0000023", status[i].getPath().getName());
+ Assert.assertEquals("base_0000025", status[i].getPath().getName());
Assert.assertEquals(BUCKET_COUNT, buckets.length);
Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
@@ -738,7 +740,7 @@ public class TestTxnCommands2 {
status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
(Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
Assert.assertEquals(1, status.length);
- Assert.assertEquals("base_0000023", status[0].getPath().getName());
+ Assert.assertEquals("base_0000025", status[0].getPath().getName());
FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
Arrays.sort(buckets);
Assert.assertEquals(BUCKET_COUNT, buckets.length);
@@ -1642,6 +1644,64 @@ public class TestTxnCommands2 {
}
/**
+ * Test compaction for Micro-managed table
+ * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables
+ * 2. Compactions will only remove subdirectories for aborted transactions of MM tables, if any
+ * @throws Exception
+ */
+ @Test
+ public void testMmTableCompaction() throws Exception {
+ // 1. Insert some rows into MM table
+ runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)");
+ runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)");
+ // There should be 2 delta directories
+ verifyDirAndResult(2);
+
+ // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs should stay.
+ runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'");
+ runWorker(hiveConf);
+ verifyDirAndResult(2);
+
+ // 3. Let a transaction be aborted
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+ runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(5,6)");
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+ // There should be 3 delta directories. The new one is the aborted one.
+ verifyDirAndResult(3);
+
+ // 4. Perform a MINOR compaction again. This time it will remove the subdir for aborted transaction.
+ runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'");
+ runWorker(hiveConf);
+ // The worker should remove the subdir for aborted transaction
+ verifyDirAndResult(2);
+
+ // 5. Run Cleaner. Shouldn't impact anything.
+ runCleaner(hiveConf);
+ verifyDirAndResult(2);
+ }
+
+ private void verifyDirAndResult(int expectedDeltas) throws Exception {
+ FileSystem fs = FileSystem.get(hiveConf);
+ // Verify the content of subdirs
+ FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+ (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+ int sawDeltaTimes = 0;
+ for (int i = 0; i < status.length; i++) {
+ Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
+ sawDeltaTimes++;
+ FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+ Assert.assertEquals(1, files.length);
+ Assert.assertTrue(files[0].getPath().getName().equals("000000_0"));
+ }
+ Assert.assertEquals(expectedDeltas, sawDeltaTimes);
+
+ // Verify query result
+ int [][] resultData = new int[][] {{1,2}, {3,4}};
+ List<String> rs = runStatementOnDriver("select a,b from " + Table.MMTBL);
+ Assert.assertEquals(stringifyValues(resultData), rs);
+ }
+
+ /**
* takes raw data and turns it into a string as if from Driver.getResults()
* sorts rows in dictionary order
*/