You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/05/05 14:36:17 UTC

[hive] branch master updated: HIVE-23280: Trigger compaction with old aborted txns (Karen Coppage via Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ba1e5f0  HIVE-23280: Trigger compaction with old aborted txns (Karen Coppage via Peter Vary)
ba1e5f0 is described below

commit ba1e5f057eca9fc61eb1283fb1873593c70c1685
Author: Karen Coppage <ka...@cloudera.com>
AuthorDate: Tue May 5 16:35:37 2020 +0200

    HIVE-23280: Trigger compaction with old aborted txns (Karen Coppage via Peter Vary)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   5 +
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |  16 ++-
 .../metastore/txn/TestCompactionTxnHandler.java    |  10 +-
 .../hive/ql/txn/compactor/TestInitiator.java       |  45 +++++++++
 .../hive/metastore/api/CompactionInfoStruct.java   | 107 ++++++++++++++++++++-
 .../src/gen/thrift/gen-php/metastore/Types.php     |  23 +++++
 .../src/gen/thrift/gen-py/hive_metastore/ttypes.py |  15 ++-
 .../src/gen/thrift/gen-rb/hive_metastore_types.rb  |   4 +-
 .../src/main/thrift/hive_metastore.thrift          |   1 +
 .../hadoop/hive/metastore/txn/CompactionInfo.java  |   6 ++
 .../hive/metastore/txn/CompactionTxnHandler.java   |  57 +++++++----
 .../apache/hadoop/hive/metastore/txn/TxnStore.java |   9 +-
 12 files changed, 261 insertions(+), 37 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 829791e..61db90c4 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2872,6 +2872,11 @@ public class HiveConf extends Configuration {
         "Number of aborted transactions involving a given table or partition that will trigger\n" +
         "a major compaction."),
 
+    HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD("hive.compactor.aborted.txn.time.threshold", "12h",
+        new TimeValidator(TimeUnit.HOURS),
+        "Age of table/partition's oldest aborted transaction when compaction will be triggered. " +
+        "Default time unit is: hours. Set to a negative number to disable."),
+
     HIVE_COMPACTOR_WAIT_TIMEOUT("hive.compactor.wait.timeout", 300000L, "Time out in "
         + "milliseconds for blocking compaction. It's value has to be higher than 2000 milliseconds. "),
     /**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 23512e2..2557809 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -89,6 +89,9 @@ public class Initiator extends MetaStoreCompactorThread {
 
       int abortedThreshold = HiveConf.getIntVar(conf,
           HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
+      long abortedTimeThreshold = HiveConf
+          .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
+              TimeUnit.MILLISECONDS);
 
       // Make sure we run through the loop once before checking to stop as this makes testing
       // much easier.  The stop value is only for testing anyway and not used when called from
@@ -109,7 +112,8 @@ public class Initiator extends MetaStoreCompactorThread {
           //todo: add method to only get current i.e. skip history - more efficient
           ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
 
-          Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold, compactionInterval)
+          Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold,
+              abortedTimeThreshold, compactionInterval)
               .stream().filter(ci -> checkCompactionElig(ci, currentCompactions)).collect(Collectors.toSet());
           LOG.debug("Found " + potentials.size() + " potential compactions, " +
               "checking to see if we should compact any of them");
@@ -271,6 +275,16 @@ public class Initiator extends MetaStoreCompactorThread {
       return CompactionType.MAJOR;
     }
 
+    if (ci.hasOldAbort) {
+      HiveConf.ConfVars oldAbortedTimeoutProp =
+          HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
+      LOG.debug("Found an aborted transaction for " + ci.getFullPartitionName()
+          + " with age older than threshold " + oldAbortedTimeoutProp + ": " + conf
+          .getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
+          + "Initiating minor compaction.");
+      return CompactionType.MINOR;
+    }
+
     if (runJobAsSelf(runAs)) {
       return determineCompactionType(ci, writeIds, sd, tblproperties);
     } else {
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 7069dae..010f9b9 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -410,7 +410,7 @@ public class TestCompactionTxnHandler {
     txnHandler.commitTxn(new CommitTxnRequest(txnid));
     assertEquals(0, txnHandler.numLocksInLockTable());
 
-    Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
+    Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100, -1L);
     assertEquals(2, potentials.size());
     boolean sawMyTable = false, sawYourTable = false;
     for (CompactionInfo ci : potentials) {
@@ -422,13 +422,13 @@ public class TestCompactionTxnHandler {
     assertTrue(sawMyTable);
     assertTrue(sawYourTable);
 
-    potentials = txnHandler.findPotentialCompactions(100, 1);
+    potentials = txnHandler.findPotentialCompactions(100, -1, 1);
     assertEquals(2, potentials.size());
 
     //simulate auto-compaction interval
     TimeUnit.SECONDS.sleep(2);
 
-    potentials = txnHandler.findPotentialCompactions(100, 1);
+    potentials = txnHandler.findPotentialCompactions(100, -1, 1);
     assertEquals(0, potentials.size());
 
     //simulate prev failed compaction
@@ -437,7 +437,7 @@ public class TestCompactionTxnHandler {
     CompactionInfo ci = txnHandler.findNextToCompact("fred");
     txnHandler.markFailed(ci);
 
-    potentials = txnHandler.findPotentialCompactions(100, 1);
+    potentials = txnHandler.findPotentialCompactions(100, -1, 1);
     assertEquals(1, potentials.size());
   }
 
@@ -574,7 +574,7 @@ public class TestCompactionTxnHandler {
     txnHandler.addDynamicPartitions(adp);
     txnHandler.commitTxn(new CommitTxnRequest(txnId));
 
-    Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(1000);
+    Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(1000, -1L);
     assertEquals(2, potentials.size());
     SortedSet<CompactionInfo> sorted = new TreeSet<CompactionInfo>(potentials);
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index e4ff14a..058430f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -50,6 +50,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+
 /**
  * Tests for the compactor Initiator thread.
  */
@@ -233,6 +234,50 @@ public class TestInitiator extends CompactorTest {
     Assert.assertEquals(2, openTxns.getOpen_txnsSize());
   }
 
+  /**
+   * Test that HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD triggers compaction.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void compactExpiredAbortedTxns() throws Exception {
+    Table t = newTable("default", "expiredAbortedTxns", false);
+    // abort a txn
+    long txnid = openTxn();
+    LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+    comp.setOperationType(DataOperationType.DELETE);
+    comp.setTablename("expiredAbortedTxns");
+    List<LockComponent> components = new ArrayList<LockComponent>(1);
+    components.add(comp);
+    LockRequest req = new LockRequest(components, "me", "localhost");
+    req.setTxnid(txnid);
+    txnHandler.lock(req);
+    txnHandler.abortTxn(new AbortTxnRequest(txnid));
+
+    // before setting, check that no compaction is queued
+    initiateAndVerifyCompactionQueueLength(0);
+
+    // negative number disables threshold check
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, -1,
+        TimeUnit.MILLISECONDS);
+    Thread.sleep(1L);
+    initiateAndVerifyCompactionQueueLength(0);
+
+    // set to 1 ms, wait 1 ms, and check that minor compaction is queued
+    conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, 1, TimeUnit.MILLISECONDS);
+    Thread.sleep(1L);
+    ShowCompactResponse rsp = initiateAndVerifyCompactionQueueLength(1);
+    Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+  }
+
+  private ShowCompactResponse initiateAndVerifyCompactionQueueLength(int expectedLength)
+      throws Exception {
+    startInitiator();
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(expectedLength, rsp.getCompactsSize());
+    return rsp;
+  }
+
   @Test
   public void noCompactWhenNoCompactSet() throws Exception {
     Map<String, String> parameters = new HashMap<String, String>(1);
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java
index 31b6ed4..b338f47 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
   private static final org.apache.thrift.protocol.TField START_FIELD_DESC = new org.apache.thrift.protocol.TField("start", org.apache.thrift.protocol.TType.I64, (short)11);
   private static final org.apache.thrift.protocol.TField HIGHEST_WRITE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("highestWriteId", org.apache.thrift.protocol.TType.I64, (short)12);
   private static final org.apache.thrift.protocol.TField ERROR_MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("errorMessage", org.apache.thrift.protocol.TType.STRING, (short)13);
+  private static final org.apache.thrift.protocol.TField HASOLDABORT_FIELD_DESC = new org.apache.thrift.protocol.TField("hasoldabort", org.apache.thrift.protocol.TType.BOOL, (short)14);
 
   private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
   static {
@@ -71,6 +72,7 @@ import org.slf4j.LoggerFactory;
   private long start; // optional
   private long highestWriteId; // optional
   private String errorMessage; // optional
+  private boolean hasoldabort; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -90,7 +92,8 @@ import org.slf4j.LoggerFactory;
     WORKER_ID((short)10, "workerId"),
     START((short)11, "start"),
     HIGHEST_WRITE_ID((short)12, "highestWriteId"),
-    ERROR_MESSAGE((short)13, "errorMessage");
+    ERROR_MESSAGE((short)13, "errorMessage"),
+    HASOLDABORT((short)14, "hasoldabort");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -131,6 +134,8 @@ import org.slf4j.LoggerFactory;
           return HIGHEST_WRITE_ID;
         case 13: // ERROR_MESSAGE
           return ERROR_MESSAGE;
+        case 14: // HASOLDABORT
+          return HASOLDABORT;
         default:
           return null;
       }
@@ -175,8 +180,9 @@ import org.slf4j.LoggerFactory;
   private static final int __TOOMANYABORTS_ISSET_ID = 1;
   private static final int __START_ISSET_ID = 2;
   private static final int __HIGHESTWRITEID_ISSET_ID = 3;
+  private static final int __HASOLDABORT_ISSET_ID = 4;
   private byte __isset_bitfield = 0;
-  private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.TOOMANYABORTS,_Fields.STATE,_Fields.WORKER_ID,_Fields.START,_Fields.HIGHEST_WRITE_ID,_Fields.ERROR_MESSAGE};
+  private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.TOOMANYABORTS,_Fields.STATE,_Fields.WORKER_ID,_Fields.START,_Fields.HIGHEST_WRITE_ID,_Fields.ERROR_MESSAGE,_Fields.HASOLDABORT};
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -206,6 +212,8 @@ import org.slf4j.LoggerFactory;
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
     tmpMap.put(_Fields.ERROR_MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("errorMessage", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.HASOLDABORT, new org.apache.thrift.meta_data.FieldMetaData("hasoldabort", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionInfoStruct.class, metaDataMap);
   }
@@ -263,6 +271,7 @@ import org.slf4j.LoggerFactory;
     if (other.isSetErrorMessage()) {
       this.errorMessage = other.errorMessage;
     }
+    this.hasoldabort = other.hasoldabort;
   }
 
   public CompactionInfoStruct deepCopy() {
@@ -288,6 +297,8 @@ import org.slf4j.LoggerFactory;
     setHighestWriteIdIsSet(false);
     this.highestWriteId = 0;
     this.errorMessage = null;
+    setHasoldabortIsSet(false);
+    this.hasoldabort = false;
   }
 
   public long getId() {
@@ -593,6 +604,28 @@ import org.slf4j.LoggerFactory;
     }
   }
 
+  public boolean isHasoldabort() {
+    return this.hasoldabort;
+  }
+
+  public void setHasoldabort(boolean hasoldabort) {
+    this.hasoldabort = hasoldabort;
+    setHasoldabortIsSet(true);
+  }
+
+  public void unsetHasoldabort() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __HASOLDABORT_ISSET_ID);
+  }
+
+  /** Returns true if field hasoldabort is set (has been assigned a value) and false otherwise */
+  public boolean isSetHasoldabort() {
+    return EncodingUtils.testBit(__isset_bitfield, __HASOLDABORT_ISSET_ID);
+  }
+
+  public void setHasoldabortIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __HASOLDABORT_ISSET_ID, value);
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case ID:
@@ -699,6 +732,14 @@ import org.slf4j.LoggerFactory;
       }
       break;
 
+    case HASOLDABORT:
+      if (value == null) {
+        unsetHasoldabort();
+      } else {
+        setHasoldabort((Boolean)value);
+      }
+      break;
+
     }
   }
 
@@ -743,6 +784,9 @@ import org.slf4j.LoggerFactory;
     case ERROR_MESSAGE:
       return getErrorMessage();
 
+    case HASOLDABORT:
+      return isHasoldabort();
+
     }
     throw new IllegalStateException();
   }
@@ -780,6 +824,8 @@ import org.slf4j.LoggerFactory;
       return isSetHighestWriteId();
     case ERROR_MESSAGE:
       return isSetErrorMessage();
+    case HASOLDABORT:
+      return isSetHasoldabort();
     }
     throw new IllegalStateException();
   }
@@ -914,6 +960,15 @@ import org.slf4j.LoggerFactory;
         return false;
     }
 
+    boolean this_present_hasoldabort = true && this.isSetHasoldabort();
+    boolean that_present_hasoldabort = true && that.isSetHasoldabort();
+    if (this_present_hasoldabort || that_present_hasoldabort) {
+      if (!(this_present_hasoldabort && that_present_hasoldabort))
+        return false;
+      if (this.hasoldabort != that.hasoldabort)
+        return false;
+    }
+
     return true;
   }
 
@@ -986,6 +1041,11 @@ import org.slf4j.LoggerFactory;
     if (present_errorMessage)
       list.add(errorMessage);
 
+    boolean present_hasoldabort = true && (isSetHasoldabort());
+    list.add(present_hasoldabort);
+    if (present_hasoldabort)
+      list.add(hasoldabort);
+
     return list.hashCode();
   }
 
@@ -1127,6 +1187,16 @@ import org.slf4j.LoggerFactory;
         return lastComparison;
       }
     }
+    lastComparison = Boolean.valueOf(isSetHasoldabort()).compareTo(other.isSetHasoldabort());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetHasoldabort()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.hasoldabort, other.hasoldabort);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -1252,6 +1322,12 @@ import org.slf4j.LoggerFactory;
       }
       first = false;
     }
+    if (isSetHasoldabort()) {
+      if (!first) sb.append(", ");
+      sb.append("hasoldabort:");
+      sb.append(this.hasoldabort);
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -1417,6 +1493,14 @@ import org.slf4j.LoggerFactory;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 14: // HASOLDABORT
+            if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+              struct.hasoldabort = iprot.readBool();
+              struct.setHasoldabortIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -1505,6 +1589,11 @@ import org.slf4j.LoggerFactory;
           oprot.writeFieldEnd();
         }
       }
+      if (struct.isSetHasoldabort()) {
+        oprot.writeFieldBegin(HASOLDABORT_FIELD_DESC);
+        oprot.writeBool(struct.hasoldabort);
+        oprot.writeFieldEnd();
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -1554,7 +1643,10 @@ import org.slf4j.LoggerFactory;
       if (struct.isSetErrorMessage()) {
         optionals.set(8);
       }
-      oprot.writeBitSet(optionals, 9);
+      if (struct.isSetHasoldabort()) {
+        optionals.set(9);
+      }
+      oprot.writeBitSet(optionals, 10);
       if (struct.isSetPartitionname()) {
         oprot.writeString(struct.partitionname);
       }
@@ -1582,6 +1674,9 @@ import org.slf4j.LoggerFactory;
       if (struct.isSetErrorMessage()) {
         oprot.writeString(struct.errorMessage);
       }
+      if (struct.isSetHasoldabort()) {
+        oprot.writeBool(struct.hasoldabort);
+      }
     }
 
     @Override
@@ -1595,7 +1690,7 @@ import org.slf4j.LoggerFactory;
       struct.setTablenameIsSet(true);
       struct.type = org.apache.hadoop.hive.metastore.api.CompactionType.findByValue(iprot.readI32());
       struct.setTypeIsSet(true);
-      BitSet incoming = iprot.readBitSet(9);
+      BitSet incoming = iprot.readBitSet(10);
       if (incoming.get(0)) {
         struct.partitionname = iprot.readString();
         struct.setPartitionnameIsSet(true);
@@ -1632,6 +1727,10 @@ import org.slf4j.LoggerFactory;
         struct.errorMessage = iprot.readString();
         struct.setErrorMessageIsSet(true);
       }
+      if (incoming.get(9)) {
+        struct.hasoldabort = iprot.readBool();
+        struct.setHasoldabortIsSet(true);
+      }
     }
   }
 
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
index e4b0bc7..302c340 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
@@ -22570,6 +22570,10 @@ class CompactionInfoStruct {
    * @var string
    */
   public $errorMessage = null;
+  /**
+   * @var bool
+   */
+  public $hasoldabort = null;
 
   public function __construct($vals=null) {
     if (!isset(self::$_TSPEC)) {
@@ -22626,6 +22630,10 @@ class CompactionInfoStruct {
           'var' => 'errorMessage',
           'type' => TType::STRING,
           ),
+        14 => array(
+          'var' => 'hasoldabort',
+          'type' => TType::BOOL,
+          ),
         );
     }
     if (is_array($vals)) {
@@ -22668,6 +22676,9 @@ class CompactionInfoStruct {
       if (isset($vals['errorMessage'])) {
         $this->errorMessage = $vals['errorMessage'];
       }
+      if (isset($vals['hasoldabort'])) {
+        $this->hasoldabort = $vals['hasoldabort'];
+      }
     }
   }
 
@@ -22781,6 +22792,13 @@ class CompactionInfoStruct {
             $xfer += $input->skip($ftype);
           }
           break;
+        case 14:
+          if ($ftype == TType::BOOL) {
+            $xfer += $input->readBool($this->hasoldabort);
+          } else {
+            $xfer += $input->skip($ftype);
+          }
+          break;
         default:
           $xfer += $input->skip($ftype);
           break;
@@ -22859,6 +22877,11 @@ class CompactionInfoStruct {
       $xfer += $output->writeString($this->errorMessage);
       $xfer += $output->writeFieldEnd();
     }
+    if ($this->hasoldabort !== null) {
+      $xfer += $output->writeFieldBegin('hasoldabort', TType::BOOL, 14);
+      $xfer += $output->writeBool($this->hasoldabort);
+      $xfer += $output->writeFieldEnd();
+    }
     $xfer += $output->writeFieldStop();
     $xfer += $output->writeStructEnd();
     return $xfer;
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 1a0fee3..28c971f 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -15685,6 +15685,7 @@ class CompactionInfoStruct:
    - start
    - highestWriteId
    - errorMessage
+   - hasoldabort
   """
 
   thrift_spec = (
@@ -15702,9 +15703,10 @@ class CompactionInfoStruct:
     (11, TType.I64, 'start', None, None, ), # 11
     (12, TType.I64, 'highestWriteId', None, None, ), # 12
     (13, TType.STRING, 'errorMessage', None, None, ), # 13
+    (14, TType.BOOL, 'hasoldabort', None, None, ), # 14
   )
 
-  def __init__(self, id=None, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, toomanyaborts=None, state=None, workerId=None, start=None, highestWriteId=None, errorMessage=None,):
+  def __init__(self, id=None, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, toomanyaborts=None, state=None, workerId=None, start=None, highestWriteId=None, errorMessage=None, hasoldabort=None,):
     self.id = id
     self.dbname = dbname
     self.tablename = tablename
@@ -15718,6 +15720,7 @@ class CompactionInfoStruct:
     self.start = start
     self.highestWriteId = highestWriteId
     self.errorMessage = errorMessage
+    self.hasoldabort = hasoldabort
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -15793,6 +15796,11 @@ class CompactionInfoStruct:
           self.errorMessage = iprot.readString()
         else:
           iprot.skip(ftype)
+      elif fid == 14:
+        if ftype == TType.BOOL:
+          self.hasoldabort = iprot.readBool()
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -15855,6 +15863,10 @@ class CompactionInfoStruct:
       oprot.writeFieldBegin('errorMessage', TType.STRING, 13)
       oprot.writeString(self.errorMessage)
       oprot.writeFieldEnd()
+    if self.hasoldabort is not None:
+      oprot.writeFieldBegin('hasoldabort', TType.BOOL, 14)
+      oprot.writeBool(self.hasoldabort)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -15885,6 +15897,7 @@ class CompactionInfoStruct:
     value = (value * 31) ^ hash(self.start)
     value = (value * 31) ^ hash(self.highestWriteId)
     value = (value * 31) ^ hash(self.errorMessage)
+    value = (value * 31) ^ hash(self.hasoldabort)
     return value
 
   def __repr__(self):
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index e6224ec..cdf97fc 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -3494,6 +3494,7 @@ class CompactionInfoStruct
   START = 11
   HIGHESTWRITEID = 12
   ERRORMESSAGE = 13
+  HASOLDABORT = 14
 
   FIELDS = {
     ID => {:type => ::Thrift::Types::I64, :name => 'id'},
@@ -3508,7 +3509,8 @@ class CompactionInfoStruct
     WORKERID => {:type => ::Thrift::Types::STRING, :name => 'workerId', :optional => true},
     START => {:type => ::Thrift::Types::I64, :name => 'start', :optional => true},
     HIGHESTWRITEID => {:type => ::Thrift::Types::I64, :name => 'highestWriteId', :optional => true},
-    ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name => 'errorMessage', :optional => true}
+    ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name => 'errorMessage', :optional => true},
+    HASOLDABORT => {:type => ::Thrift::Types::BOOL, :name => 'hasoldabort', :optional => true}
   }
 
   def struct_fields; FIELDS; end
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index 8462b3d..c78aeb4 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1167,6 +1167,7 @@ struct CompactionInfoStruct {
     11: optional i64 start
     12: optional i64 highestWriteId
     13: optional string errorMessage
+    14: optional bool hasoldabort
 }
 
 struct OptionalCompactionInfoStruct {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index 70d63ab..062a97c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -50,6 +50,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
   public String runAs;
   public String properties;
   public boolean tooManyAborts = false;
+  public boolean hasOldAbort = false;
   /**
    * The highest write id that the compaction job will pay attention to.
    * {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL) 
@@ -118,6 +119,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
       "properties:" + properties + "," +
       "runAs:" + runAs + "," +
       "tooManyAborts:" + tooManyAborts + "," +
+      "hasOldAbort:" + hasOldAbort + "," +
       "highestWriteId:" + highestWriteId + "," +
       "errorMessage:" + errorMessage;
   }
@@ -193,6 +195,9 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
     if (cr.isSetToomanyaborts()) {
       ci.tooManyAborts = cr.isToomanyaborts();
     }
+    if (cr.isSetHasoldabort()) {
+      ci.hasOldAbort = cr.isHasoldabort();
+    }
     if (cr.isSetState() && cr.getState().length() != 1) {
       throw new IllegalStateException("State should only be one character but it was set to " + cr.getState());
     } else if (cr.isSetState()) {
@@ -220,6 +225,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
     cr.setRunas(ci.runAs);
     cr.setProperties(ci.properties);
     cr.setToomanyaborts(ci.tooManyAborts);
+    cr.setHasoldabort(ci.hasOldAbort);
     cr.setStart(ci.start);
     cr.setState(Character.toString(ci.state));
     cr.setWorkerId(ci.workerId);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index a1bc109..d59f863 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.metastore.txn;
 import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.util.StringUtils;
@@ -59,13 +58,15 @@ class CompactionTxnHandler extends TxnHandler {
    */
   @Override
   @RetrySemantics.ReadOnly
-  public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold) throws MetaException {
-    return findPotentialCompactions(abortedThreshold, -1);
+  public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long abortedTimeThreshold)
+      throws MetaException {
+    return findPotentialCompactions(abortedThreshold, abortedTimeThreshold, -1);
   }
 
   @Override
   @RetrySemantics.ReadOnly
-  public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long checkInterval) throws MetaException {
+  public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold,
+      long abortedTimeThreshold, long checkInterval) throws MetaException {
     Connection dbConn = null;
     Set<CompactionInfo> response = new HashSet<>();
     Statement stmt = null;
@@ -75,7 +76,8 @@ class CompactionTxnHandler extends TxnHandler {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         // Check for completed transactions
-        String s = "SELECT DISTINCT \"TC\".\"CTC_DATABASE\", \"TC\".\"CTC_TABLE\", \"TC\".\"CTC_PARTITION\" " +
+        final String s = "SELECT DISTINCT \"TC\".\"CTC_DATABASE\", \"TC\".\"CTC_TABLE\", \"TC\"" +
+            ".\"CTC_PARTITION\" " +
           "FROM \"COMPLETED_TXN_COMPONENTS\" \"TC\" " + (checkInterval > 0 ?
           "LEFT JOIN ( " +
           "  SELECT \"C1\".* FROM \"COMPLETED_COMPACTIONS\" \"C1\" " +
@@ -101,38 +103,51 @@ class CompactionTxnHandler extends TxnHandler {
         }
         rs.close();
 
-        // Check for aborted txns
-        s = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " +
-          "FROM \"TXNS\", \"TXN_COMPONENTS\" " +
-          "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' " +
-          "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " +
-          "HAVING COUNT(*) > " + abortedThreshold;
+        // Check for aborted txns: number of aborted txns past threshold and age of aborted txns
+        // past time threshold
+        boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0;
+        final String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\","
+            + "MIN(\"TXN_STARTED\"), COUNT(*)"
+            + "FROM \"TXNS\", \"TXN_COMPONENTS\" "
+            + "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' "
+            + "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\""
+            + (checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold);
 
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
+        LOG.debug("Going to execute query <" + sCheckAborted + ">");
+        rs = stmt.executeQuery(sCheckAborted);
+        long systemTime = System.currentTimeMillis();
         while (rs.next()) {
-          CompactionInfo info = new CompactionInfo();
-          info.dbname = rs.getString(1);
-          info.tableName = rs.getString(2);
-          info.partName = rs.getString(3);
-          info.tooManyAborts = true;
-          response.add(info);
+          boolean pastTimeThreshold =
+              checkAbortedTimeThreshold && rs.getLong(4) + abortedTimeThreshold < systemTime;
+          int numAbortedTxns = rs.getInt(5);
+          if (numAbortedTxns > abortedThreshold || pastTimeThreshold) {
+            CompactionInfo info = new CompactionInfo();
+            info.dbname = rs.getString(1);
+            info.tableName = rs.getString(2);
+            info.partName = rs.getString(3);
+            info.tooManyAborts = numAbortedTxns > abortedThreshold;
+            info.hasOldAbort = pastTimeThreshold;
+            response.add(info);
+          }
         }
 
         LOG.debug("Going to rollback");
         dbConn.rollback();
       } catch (SQLException e) {
         LOG.error("Unable to connect to transaction database " + e.getMessage());
-        checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + abortedThreshold + ")");
+        checkRetryable(dbConn, e,
+            "findPotentialCompactions(maxAborted:" + abortedThreshold
+                + ", abortedTimeThreshold:" + abortedTimeThreshold + ")");
       } finally {
         close(rs, stmt, dbConn);
       }
       return response;
     }
     catch (RetryException e) {
-      return findPotentialCompactions(abortedThreshold, checkInterval);
+      return findPotentialCompactions(abortedThreshold, abortedTimeThreshold, checkInterval);
     }
   }
+
   /**
    * This will grab the next compaction request off of
    * the queue, and assign it to the worker.
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index e8ac71a..28f22e6 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
@@ -31,7 +30,6 @@ import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
 import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 /**
@@ -315,14 +313,17 @@ public interface TxnStore extends Configurable {
    * that may be ready for compaction.  Also, look through txns and txn_components tables for
    * aborted transactions that we should add to the list.
    * @param abortedThreshold  number of aborted queries forming a potential compaction request.
+   * @param abortedTimeThreshold age of an aborted txn in milliseconds that will trigger a
+   *                             potential compaction request.
    * @return list of CompactionInfo structs.  These will not have id, type,
    * or runAs set since these are only potential compactions not actual ones.
    */
   @RetrySemantics.ReadOnly
-  Set<CompactionInfo> findPotentialCompactions(int abortedThreshold) throws MetaException;
+  Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long abortedTimeThreshold) throws MetaException;
 
   @RetrySemantics.ReadOnly
-  Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long checkInterval) throws MetaException;
+  Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long abortedTimeThreshold, long checkInterval)
+      throws MetaException;
 
   /**
    * This updates COMPACTION_QUEUE.  Set runAs username for the case where the request was