You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/24 18:06:57 UTC

hive git commit: HIVE-14881 : integrate MM tables into ACID: merge cleaner into ACID threads (Wei Zheng)

Repository: hive
Updated Branches:
  refs/heads/hive-14535 ed29d1b80 -> 93aec1217


HIVE-14881 : integrate MM tables into ACID: merge cleaner into ACID threads (Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/93aec121
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/93aec121
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/93aec121

Branch: refs/heads/hive-14535
Commit: 93aec1217b29d9026d258771bf6bbcd0f2ae8f61
Parents: ed29d1b
Author: Wei Zheng <we...@apache.org>
Authored: Wed May 24 11:05:01 2017 -0700
Committer: Wei Zheng <we...@apache.org>
Committed: Wed May 24 11:05:01 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hive/metastore/txn/TxnUtils.java     |  2 +-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java |  2 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 32 ++++++---
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   |  2 +-
 .../hive/ql/txn/compactor/CompactorMR.java      | 36 ++++++++++
 .../hadoop/hive/ql/txn/compactor/Initiator.java |  6 ++
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 70 ++++++++++++++++++--
 7 files changed, 134 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 6e0070b..b2ced6b 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -99,7 +99,7 @@ public class TxnUtils {
     }
     highWater = minOpenTxn == Long.MAX_VALUE ? highWater : minOpenTxn - 1;
     BitSet bitSet = new BitSet(exceptions.length);
-    bitSet.set(0, bitSet.length()); // for ValidCompactorTxnList, everything in exceptions are aborted
+    bitSet.set(0, exceptions.length); // for ValidCompactorTxnList, everything in exceptions are aborted
     return new ValidCompactorTxnList(exceptions, bitSet, highWater);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index 733595c..f67831e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -1860,7 +1860,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
   private int compact(Hive db, AlterTableSimpleDesc desc) throws HiveException {
 
     Table tbl = db.getTable(desc.getTableName());
-    if (!AcidUtils.isFullAcidTable(tbl)) {
+    if (!AcidUtils.isFullAcidTable(tbl) && !MetaStoreUtils.isInsertOnlyTable(tbl.getParameters())) {
       throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED, tbl.getDbName(),
           tbl.getTableName());
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 902caa3..e723e2f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -534,6 +534,12 @@ public class AcidUtils {
      * more up to date ones.  Not {@code null}.
      */
     List<FileStatus> getObsolete();
+
+    /**
+     * Get the list of directories that has nothing but aborted transactions.
+     * @return the list of aborted directories
+     */
+    List<FileStatus> getAbortedDirectories();
   }
 
   public static class ParsedDelta implements Comparable<ParsedDelta> {
@@ -795,21 +801,22 @@ public class AcidUtils {
                                        boolean useFileIds,
                                        boolean ignoreEmptyFiles
                                        ) throws IOException {
-    return getAcidState(directory, conf, txnList, Ref.from(useFileIds), ignoreEmptyFiles);
+    return getAcidState(directory, conf, txnList, Ref.from(useFileIds), ignoreEmptyFiles, null);
   }
 
   public static Directory getAcidState(Path directory,
                                        Configuration conf,
                                        ValidTxnList txnList,
                                        Ref<Boolean> useFileIds,
-                                       boolean ignoreEmptyFiles
-                                       ) throws IOException {
+                                       boolean ignoreEmptyFiles,
+                                       Map<String, String> tblproperties) throws IOException {
     FileSystem fs = directory.getFileSystem(conf);
     // The following 'deltas' includes all kinds of delta files including insert & delete deltas.
     final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
     List<ParsedDelta> working = new ArrayList<ParsedDelta>();
     List<FileStatus> originalDirectories = new ArrayList<FileStatus>();
     final List<FileStatus> obsolete = new ArrayList<FileStatus>();
+    final List<FileStatus> abortedDirectories = new ArrayList<>();
     List<HdfsFileStatusWithId> childrenWithId = null;
     Boolean val = useFileIds.value;
     if (val == null || val) {
@@ -829,14 +836,14 @@ public class AcidUtils {
     final List<HdfsFileStatusWithId> original = new ArrayList<>();
     if (childrenWithId != null) {
       for (HdfsFileStatusWithId child : childrenWithId) {
-        getChildState(child.getFileStatus(), child, txnList, working,
-            originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles);
+        getChildState(child.getFileStatus(), child, txnList, working, originalDirectories, original,
+            obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties);
       }
     } else {
       List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter);
       for (FileStatus child : children) {
-        getChildState(
-            child, null, txnList, working, originalDirectories, original, obsolete, bestBase, ignoreEmptyFiles);
+        getChildState(child, null, txnList, working, originalDirectories, original, obsolete,
+            bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties);
       }
     }
 
@@ -946,6 +953,11 @@ public class AcidUtils {
       public List<FileStatus> getObsolete() {
         return obsolete;
       }
+
+      @Override
+      public List<FileStatus> getAbortedDirectories() {
+        return abortedDirectories;
+      }
     };
   }
   /**
@@ -966,7 +978,7 @@ public class AcidUtils {
   private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
       ValidTxnList txnList, List<ParsedDelta> working, List<FileStatus> originalDirectories,
       List<HdfsFileStatusWithId> original, List<FileStatus> obsolete, TxnBase bestBase,
-      boolean ignoreEmptyFiles) throws IOException {
+      boolean ignoreEmptyFiles, List<FileStatus> aborted, Map<String, String> tblproperties) throws IOException {
     Path p = child.getPath();
     String fn = p.getName();
     if (fn.startsWith(BASE_PREFIX) && child.isDir()) {
@@ -995,6 +1007,10 @@ public class AcidUtils {
       String deltaPrefix =
               (fn.startsWith(DELTA_PREFIX)) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
       ParsedDelta delta = parseDelta(child, deltaPrefix);
+      if (tblproperties != null && MetaStoreUtils.isInsertOnlyTable(tblproperties) &&
+          ValidTxnList.RangeResponse.ALL == txnList.isTxnRangeAborted(delta.minTransaction, delta.maxTransaction)) {
+        aborted.add(child);
+      }
       if (txnList.isTxnRangeValid(delta.minTransaction,
           delta.maxTransaction) !=
           ValidTxnList.RangeResponse.NONE) {

http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 8fb7211..db8c23f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -1094,7 +1094,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
 
     private AcidDirInfo callInternal() throws IOException {
       AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, context.conf,
-          context.transactionList, useFileIds, true);
+          context.transactionList, useFileIds, true, null);
       Path base = dirInfo.getBaseDirectory();
       // find the base files (original or new style)
       List<AcidBaseFileInfo> baseFiles = new ArrayList<AcidBaseFileInfo>();

http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index f83b6db..c0b39ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.common.ValidCompactorTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
@@ -70,6 +71,7 @@ import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -204,6 +206,16 @@ public class CompactorMR {
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
       throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
     }
+
+    // For MM tables we don't need to launch MR jobs as there is no compaction needed.
+    // We just need to delete the directories for aborted transactions.
+    if (MetaStoreUtils.isInsertOnlyTable(t.getParameters())) {
+      LOG.debug("Going to delete directories for aborted transactions for MM table "
+          + t.getDbName() + "." + t.getTableName());
+      removeFiles(conf, sd.getLocation(), txns, t);
+      return;
+    }
+
     JobConf job = createBaseJobConf(conf, jobName, t, sd, txns, ci);
 
     // Figure out and encode what files we need to read.  We do this here (rather than in
@@ -344,6 +356,30 @@ public class CompactorMR {
     HiveConf.setVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT, HiveInputFormat.class.getName());
   }
 
+  // Remove the directories for aborted transactions only
+  private void removeFiles(HiveConf conf, String location, ValidTxnList txnList, Table t)
+      throws IOException {
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(location), conf, txnList,
+        Ref.from(false), false, t.getParameters());
+    // For MM table, we only want to delete delta dirs for aborted txns.
+    List<FileStatus> abortedDirs = dir.getAbortedDirectories();
+    List<Path> filesToDelete = new ArrayList<>(abortedDirs.size());
+    for (FileStatus stat : abortedDirs) {
+      filesToDelete.add(stat.getPath());
+    }
+    if (filesToDelete.size() < 1) {
+      LOG.warn("Hmm, nothing to delete in the worker for directory " + location +
+          ", that hardly seems right.");
+      return;
+    }
+    LOG.info("About to remove " + filesToDelete.size() + " aborted directories from " + location);
+    FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
+    for (Path dead : filesToDelete) {
+      LOG.debug("Going to delete path " + dead.toString());
+      fs.delete(dead, true);
+    }
+  }
+
   public JobConf getMrJob() {
     return mrJob;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index af4a1da..c52bd3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionResponse;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -251,6 +252,11 @@ public class Initiator extends CompactorThread {
   private CompactionType determineCompactionType(CompactionInfo ci, ValidTxnList txns,
                                                  StorageDescriptor sd, Map<String, String> tblproperties)
       throws IOException, InterruptedException {
+
+    if (MetaStoreUtils.isInsertOnlyTable(tblproperties)) {
+      return CompactionType.MINOR;
+    }
+
     boolean noBase = false;
     Path location = new Path(sd.getLocation());
     FileSystem fs = location.getFileSystem(conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/93aec121/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 5786c4f..7a73a17 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -91,7 +91,8 @@ public class TestTxnCommands2 {
     NONACIDORCTBL("nonAcidOrcTbl"),
     NONACIDPART("nonAcidPart"),
     NONACIDPART2("nonAcidPart2"),
-    ACIDNESTEDPART("acidNestedPart");
+    ACIDNESTEDPART("acidNestedPart"),
+    MMTBL("mmTbl");
 
     private final String name;
     @Override
@@ -143,6 +144,7 @@ public class TestTxnCommands2 {
     runStatementOnDriver("create table " + Table.ACIDNESTEDPART +
       "(a int, b int) partitioned by (p int, q int) clustered by (a) into " + BUCKET_COUNT +
       " buckets stored as orc TBLPROPERTIES (" + tableProperties + ")");
+    runStatementOnDriver("create table " + Table.MMTBL + "(a int, b int) TBLPROPERTIES ('transactional'='true', 'transactional_properties'='insert_only')");
   }
 
   protected void dropTables() throws Exception {
@@ -662,11 +664,11 @@ public class TestTxnCommands2 {
         FileStatus[] buckets = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
         Arrays.sort(buckets);
         if (numDelta == 1) {
-          Assert.assertEquals("delta_0000022_0000022_0000", status[i].getPath().getName());
+          Assert.assertEquals("delta_0000024_0000024_0000", status[i].getPath().getName());
           Assert.assertEquals(BUCKET_COUNT - 1, buckets.length);
           Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         } else if (numDelta == 2) {
-          Assert.assertEquals("delta_0000023_0000023_0000", status[i].getPath().getName());
+          Assert.assertEquals("delta_0000025_0000025_0000", status[i].getPath().getName());
           Assert.assertEquals(BUCKET_COUNT, buckets.length);
           Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
           Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
@@ -711,7 +713,7 @@ public class TestTxnCommands2 {
           Assert.assertEquals("bucket_00001", buckets[0].getPath().getName());
         } else if (numBase == 2) {
           // The new base dir now has two bucket files, since the delta dir has two bucket files
-          Assert.assertEquals("base_0000023", status[i].getPath().getName());
+          Assert.assertEquals("base_0000025", status[i].getPath().getName());
           Assert.assertEquals(BUCKET_COUNT, buckets.length);
           Assert.assertEquals("bucket_00000", buckets[0].getPath().getName());
           Assert.assertEquals("bucket_00001", buckets[1].getPath().getName());
@@ -738,7 +740,7 @@ public class TestTxnCommands2 {
     status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
         (Table.NONACIDORCTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
     Assert.assertEquals(1, status.length);
-    Assert.assertEquals("base_0000023", status[0].getPath().getName());
+    Assert.assertEquals("base_0000025", status[0].getPath().getName());
     FileStatus[] buckets = fs.listStatus(status[0].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
     Arrays.sort(buckets);
     Assert.assertEquals(BUCKET_COUNT, buckets.length);
@@ -1642,6 +1644,64 @@ public class TestTxnCommands2 {
   }
 
   /**
+   * Test compaction for Micro-managed table
+   * 1. Regular compaction shouldn't impact any valid subdirectories of MM tables
+   * 2. Compactions will only remove subdirectories for aborted transactions of MM tables, if any
+   * @throws Exception
+   */
+  @Test
+  public void testMmTableCompaction() throws Exception {
+    // 1. Insert some rows into MM table
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(1,2)");
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(3,4)");
+    // There should be 2 delta directories
+    verifyDirAndResult(2);
+
+    // 2. Perform a MINOR compaction. Since nothing was aborted, subdirs should stay.
+    runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'");
+    runWorker(hiveConf);
+    verifyDirAndResult(2);
+
+    // 3. Let a transaction be aborted
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+    runStatementOnDriver("insert into " + Table.MMTBL + "(a,b) values(5,6)");
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, false);
+    // There should be 3 delta directories. The new one is the aborted one.
+    verifyDirAndResult(3);
+
+    // 4. Perform a MINOR compaction again. This time it will remove the subdir for aborted transaction.
+    runStatementOnDriver("alter table "+ Table.MMTBL + " compact 'MINOR'");
+    runWorker(hiveConf);
+    // The worker should remove the subdir for aborted transaction
+    verifyDirAndResult(2);
+
+    // 5. Run Cleaner. Shouldn't impact anything.
+    runCleaner(hiveConf);
+    verifyDirAndResult(2);
+  }
+
+  private void verifyDirAndResult(int expectedDeltas) throws Exception {
+    FileSystem fs = FileSystem.get(hiveConf);
+    // Verify the content of subdirs
+    FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" +
+        (Table.MMTBL).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER);
+    int sawDeltaTimes = 0;
+    for (int i = 0; i < status.length; i++) {
+      Assert.assertTrue(status[i].getPath().getName().matches("delta_.*"));
+      sawDeltaTimes++;
+      FileStatus[] files = fs.listStatus(status[i].getPath(), FileUtils.STAGING_DIR_PATH_FILTER);
+      Assert.assertEquals(1, files.length);
+      Assert.assertTrue(files[0].getPath().getName().equals("000000_0"));
+    }
+    Assert.assertEquals(expectedDeltas, sawDeltaTimes);
+
+    // Verify query result
+    int [][] resultData = new int[][] {{1,2}, {3,4}};
+    List<String> rs = runStatementOnDriver("select a,b from " + Table.MMTBL);
+    Assert.assertEquals(stringifyValues(resultData), rs);
+  }
+
+  /**
    * takes raw data and turns it into a string as if from Driver.getResults()
    * sorts rows in dictionary order
    */