You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2020/05/05 14:36:17 UTC
[hive] branch master updated: HIVE-23280: Trigger compaction with
old aborted txns (Karen Coppage via Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new ba1e5f0 HIVE-23280: Trigger compaction with old aborted txns (Karen Coppage via Peter Vary)
ba1e5f0 is described below
commit ba1e5f057eca9fc61eb1283fb1873593c70c1685
Author: Karen Coppage <ka...@cloudera.com>
AuthorDate: Tue May 5 16:35:37 2020 +0200
HIVE-23280: Trigger compaction with old aborted txns (Karen Coppage via Peter Vary)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +
.../hadoop/hive/ql/txn/compactor/Initiator.java | 16 ++-
.../metastore/txn/TestCompactionTxnHandler.java | 10 +-
.../hive/ql/txn/compactor/TestInitiator.java | 45 +++++++++
.../hive/metastore/api/CompactionInfoStruct.java | 107 ++++++++++++++++++++-
.../src/gen/thrift/gen-php/metastore/Types.php | 23 +++++
.../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 15 ++-
.../src/gen/thrift/gen-rb/hive_metastore_types.rb | 4 +-
.../src/main/thrift/hive_metastore.thrift | 1 +
.../hadoop/hive/metastore/txn/CompactionInfo.java | 6 ++
.../hive/metastore/txn/CompactionTxnHandler.java | 57 +++++++----
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 9 +-
12 files changed, 261 insertions(+), 37 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 829791e..61db90c4 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2872,6 +2872,11 @@ public class HiveConf extends Configuration {
"Number of aborted transactions involving a given table or partition that will trigger\n" +
"a major compaction."),
+ HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD("hive.compactor.aborted.txn.time.threshold", "12h",
+ new TimeValidator(TimeUnit.HOURS),
+ "Age of table/partition's oldest aborted transaction when compaction will be triggered. " +
+ "Default time unit is: hours. Set to a negative number to disable."),
+
HIVE_COMPACTOR_WAIT_TIMEOUT("hive.compactor.wait.timeout", 300000L, "Time out in "
+ "milliseconds for blocking compaction. It's value has to be higher than 2000 milliseconds. "),
/**
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 23512e2..2557809 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -89,6 +89,9 @@ public class Initiator extends MetaStoreCompactorThread {
int abortedThreshold = HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
+ long abortedTimeThreshold = HiveConf
+ .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
+ TimeUnit.MILLISECONDS);
// Make sure we run through the loop once before checking to stop as this makes testing
// much easier. The stop value is only for testing anyway and not used when called from
@@ -109,7 +112,8 @@ public class Initiator extends MetaStoreCompactorThread {
//todo: add method to only get current i.e. skip history - more efficient
ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
- Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold, compactionInterval)
+ Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold,
+ abortedTimeThreshold, compactionInterval)
.stream().filter(ci -> checkCompactionElig(ci, currentCompactions)).collect(Collectors.toSet());
LOG.debug("Found " + potentials.size() + " potential compactions, " +
"checking to see if we should compact any of them");
@@ -271,6 +275,16 @@ public class Initiator extends MetaStoreCompactorThread {
return CompactionType.MAJOR;
}
+ if (ci.hasOldAbort) {
+ HiveConf.ConfVars oldAbortedTimeoutProp =
+ HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
+ LOG.debug("Found an aborted transaction for " + ci.getFullPartitionName()
+ + " with age older than threshold " + oldAbortedTimeoutProp + ": " + conf
+ .getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
+ + "Initiating minor compaction.");
+ return CompactionType.MINOR;
+ }
+
if (runJobAsSelf(runAs)) {
return determineCompactionType(ci, writeIds, sd, tblproperties);
} else {
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 7069dae..010f9b9 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -410,7 +410,7 @@ public class TestCompactionTxnHandler {
txnHandler.commitTxn(new CommitTxnRequest(txnid));
assertEquals(0, txnHandler.numLocksInLockTable());
- Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100);
+ Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(100, -1L);
assertEquals(2, potentials.size());
boolean sawMyTable = false, sawYourTable = false;
for (CompactionInfo ci : potentials) {
@@ -422,13 +422,13 @@ public class TestCompactionTxnHandler {
assertTrue(sawMyTable);
assertTrue(sawYourTable);
- potentials = txnHandler.findPotentialCompactions(100, 1);
+ potentials = txnHandler.findPotentialCompactions(100, -1, 1);
assertEquals(2, potentials.size());
//simulate auto-compaction interval
TimeUnit.SECONDS.sleep(2);
- potentials = txnHandler.findPotentialCompactions(100, 1);
+ potentials = txnHandler.findPotentialCompactions(100, -1, 1);
assertEquals(0, potentials.size());
//simulate prev failed compaction
@@ -437,7 +437,7 @@ public class TestCompactionTxnHandler {
CompactionInfo ci = txnHandler.findNextToCompact("fred");
txnHandler.markFailed(ci);
- potentials = txnHandler.findPotentialCompactions(100, 1);
+ potentials = txnHandler.findPotentialCompactions(100, -1, 1);
assertEquals(1, potentials.size());
}
@@ -574,7 +574,7 @@ public class TestCompactionTxnHandler {
txnHandler.addDynamicPartitions(adp);
txnHandler.commitTxn(new CommitTxnRequest(txnId));
- Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(1000);
+ Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(1000, -1L);
assertEquals(2, potentials.size());
SortedSet<CompactionInfo> sorted = new TreeSet<CompactionInfo>(potentials);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index e4ff14a..058430f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -50,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+
/**
* Tests for the compactor Initiator thread.
*/
@@ -233,6 +234,50 @@ public class TestInitiator extends CompactorTest {
Assert.assertEquals(2, openTxns.getOpen_txnsSize());
}
+ /**
+ * Test that HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD triggers compaction.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void compactExpiredAbortedTxns() throws Exception {
+ Table t = newTable("default", "expiredAbortedTxns", false);
+ // abort a txn
+ long txnid = openTxn();
+ LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
+ comp.setOperationType(DataOperationType.DELETE);
+ comp.setTablename("expiredAbortedTxns");
+ List<LockComponent> components = new ArrayList<LockComponent>(1);
+ components.add(comp);
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnid);
+ txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+
+ // before setting, check that no compaction is queued
+ initiateAndVerifyCompactionQueueLength(0);
+
+ // negative number disables threshold check
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, -1,
+ TimeUnit.MILLISECONDS);
+ Thread.sleep(1L);
+ initiateAndVerifyCompactionQueueLength(0);
+
+ // set to 1 ms, wait 1 ms, and check that minor compaction is queued
+ conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, 1, TimeUnit.MILLISECONDS);
+ Thread.sleep(1L);
+ ShowCompactResponse rsp = initiateAndVerifyCompactionQueueLength(1);
+ Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+ }
+
+ private ShowCompactResponse initiateAndVerifyCompactionQueueLength(int expectedLength)
+ throws Exception {
+ startInitiator();
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(expectedLength, rsp.getCompactsSize());
+ return rsp;
+ }
+
@Test
public void noCompactWhenNoCompactSet() throws Exception {
Map<String, String> parameters = new HashMap<String, String>(1);
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java
index 31b6ed4..b338f47 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java
@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField START_FIELD_DESC = new org.apache.thrift.protocol.TField("start", org.apache.thrift.protocol.TType.I64, (short)11);
private static final org.apache.thrift.protocol.TField HIGHEST_WRITE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("highestWriteId", org.apache.thrift.protocol.TType.I64, (short)12);
private static final org.apache.thrift.protocol.TField ERROR_MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("errorMessage", org.apache.thrift.protocol.TType.STRING, (short)13);
+ private static final org.apache.thrift.protocol.TField HASOLDABORT_FIELD_DESC = new org.apache.thrift.protocol.TField("hasoldabort", org.apache.thrift.protocol.TType.BOOL, (short)14);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -71,6 +72,7 @@ import org.slf4j.LoggerFactory;
private long start; // optional
private long highestWriteId; // optional
private String errorMessage; // optional
+ private boolean hasoldabort; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -90,7 +92,8 @@ import org.slf4j.LoggerFactory;
WORKER_ID((short)10, "workerId"),
START((short)11, "start"),
HIGHEST_WRITE_ID((short)12, "highestWriteId"),
- ERROR_MESSAGE((short)13, "errorMessage");
+ ERROR_MESSAGE((short)13, "errorMessage"),
+ HASOLDABORT((short)14, "hasoldabort");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -131,6 +134,8 @@ import org.slf4j.LoggerFactory;
return HIGHEST_WRITE_ID;
case 13: // ERROR_MESSAGE
return ERROR_MESSAGE;
+ case 14: // HASOLDABORT
+ return HASOLDABORT;
default:
return null;
}
@@ -175,8 +180,9 @@ import org.slf4j.LoggerFactory;
private static final int __TOOMANYABORTS_ISSET_ID = 1;
private static final int __START_ISSET_ID = 2;
private static final int __HIGHESTWRITEID_ISSET_ID = 3;
+ private static final int __HASOLDABORT_ISSET_ID = 4;
private byte __isset_bitfield = 0;
- private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.TOOMANYABORTS,_Fields.STATE,_Fields.WORKER_ID,_Fields.START,_Fields.HIGHEST_WRITE_ID,_Fields.ERROR_MESSAGE};
+ private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.TOOMANYABORTS,_Fields.STATE,_Fields.WORKER_ID,_Fields.START,_Fields.HIGHEST_WRITE_ID,_Fields.ERROR_MESSAGE,_Fields.HASOLDABORT};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -206,6 +212,8 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
tmpMap.put(_Fields.ERROR_MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("errorMessage", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.HASOLDABORT, new org.apache.thrift.meta_data.FieldMetaData("hasoldabort", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionInfoStruct.class, metaDataMap);
}
@@ -263,6 +271,7 @@ import org.slf4j.LoggerFactory;
if (other.isSetErrorMessage()) {
this.errorMessage = other.errorMessage;
}
+ this.hasoldabort = other.hasoldabort;
}
public CompactionInfoStruct deepCopy() {
@@ -288,6 +297,8 @@ import org.slf4j.LoggerFactory;
setHighestWriteIdIsSet(false);
this.highestWriteId = 0;
this.errorMessage = null;
+ setHasoldabortIsSet(false);
+ this.hasoldabort = false;
}
public long getId() {
@@ -593,6 +604,28 @@ import org.slf4j.LoggerFactory;
}
}
+ public boolean isHasoldabort() {
+ return this.hasoldabort;
+ }
+
+ public void setHasoldabort(boolean hasoldabort) {
+ this.hasoldabort = hasoldabort;
+ setHasoldabortIsSet(true);
+ }
+
+ public void unsetHasoldabort() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __HASOLDABORT_ISSET_ID);
+ }
+
+ /** Returns true if field hasoldabort is set (has been assigned a value) and false otherwise */
+ public boolean isSetHasoldabort() {
+ return EncodingUtils.testBit(__isset_bitfield, __HASOLDABORT_ISSET_ID);
+ }
+
+ public void setHasoldabortIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __HASOLDABORT_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case ID:
@@ -699,6 +732,14 @@ import org.slf4j.LoggerFactory;
}
break;
+ case HASOLDABORT:
+ if (value == null) {
+ unsetHasoldabort();
+ } else {
+ setHasoldabort((Boolean)value);
+ }
+ break;
+
}
}
@@ -743,6 +784,9 @@ import org.slf4j.LoggerFactory;
case ERROR_MESSAGE:
return getErrorMessage();
+ case HASOLDABORT:
+ return isHasoldabort();
+
}
throw new IllegalStateException();
}
@@ -780,6 +824,8 @@ import org.slf4j.LoggerFactory;
return isSetHighestWriteId();
case ERROR_MESSAGE:
return isSetErrorMessage();
+ case HASOLDABORT:
+ return isSetHasoldabort();
}
throw new IllegalStateException();
}
@@ -914,6 +960,15 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_hasoldabort = true && this.isSetHasoldabort();
+ boolean that_present_hasoldabort = true && that.isSetHasoldabort();
+ if (this_present_hasoldabort || that_present_hasoldabort) {
+ if (!(this_present_hasoldabort && that_present_hasoldabort))
+ return false;
+ if (this.hasoldabort != that.hasoldabort)
+ return false;
+ }
+
return true;
}
@@ -986,6 +1041,11 @@ import org.slf4j.LoggerFactory;
if (present_errorMessage)
list.add(errorMessage);
+ boolean present_hasoldabort = true && (isSetHasoldabort());
+ list.add(present_hasoldabort);
+ if (present_hasoldabort)
+ list.add(hasoldabort);
+
return list.hashCode();
}
@@ -1127,6 +1187,16 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetHasoldabort()).compareTo(other.isSetHasoldabort());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetHasoldabort()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.hasoldabort, other.hasoldabort);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -1252,6 +1322,12 @@ import org.slf4j.LoggerFactory;
}
first = false;
}
+ if (isSetHasoldabort()) {
+ if (!first) sb.append(", ");
+ sb.append("hasoldabort:");
+ sb.append(this.hasoldabort);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -1417,6 +1493,14 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 14: // HASOLDABORT
+ if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+ struct.hasoldabort = iprot.readBool();
+ struct.setHasoldabortIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -1505,6 +1589,11 @@ import org.slf4j.LoggerFactory;
oprot.writeFieldEnd();
}
}
+ if (struct.isSetHasoldabort()) {
+ oprot.writeFieldBegin(HASOLDABORT_FIELD_DESC);
+ oprot.writeBool(struct.hasoldabort);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -1554,7 +1643,10 @@ import org.slf4j.LoggerFactory;
if (struct.isSetErrorMessage()) {
optionals.set(8);
}
- oprot.writeBitSet(optionals, 9);
+ if (struct.isSetHasoldabort()) {
+ optionals.set(9);
+ }
+ oprot.writeBitSet(optionals, 10);
if (struct.isSetPartitionname()) {
oprot.writeString(struct.partitionname);
}
@@ -1582,6 +1674,9 @@ import org.slf4j.LoggerFactory;
if (struct.isSetErrorMessage()) {
oprot.writeString(struct.errorMessage);
}
+ if (struct.isSetHasoldabort()) {
+ oprot.writeBool(struct.hasoldabort);
+ }
}
@Override
@@ -1595,7 +1690,7 @@ import org.slf4j.LoggerFactory;
struct.setTablenameIsSet(true);
struct.type = org.apache.hadoop.hive.metastore.api.CompactionType.findByValue(iprot.readI32());
struct.setTypeIsSet(true);
- BitSet incoming = iprot.readBitSet(9);
+ BitSet incoming = iprot.readBitSet(10);
if (incoming.get(0)) {
struct.partitionname = iprot.readString();
struct.setPartitionnameIsSet(true);
@@ -1632,6 +1727,10 @@ import org.slf4j.LoggerFactory;
struct.errorMessage = iprot.readString();
struct.setErrorMessageIsSet(true);
}
+ if (incoming.get(9)) {
+ struct.hasoldabort = iprot.readBool();
+ struct.setHasoldabortIsSet(true);
+ }
}
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
index e4b0bc7..302c340 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
@@ -22570,6 +22570,10 @@ class CompactionInfoStruct {
* @var string
*/
public $errorMessage = null;
+ /**
+ * @var bool
+ */
+ public $hasoldabort = null;
public function __construct($vals=null) {
if (!isset(self::$_TSPEC)) {
@@ -22626,6 +22630,10 @@ class CompactionInfoStruct {
'var' => 'errorMessage',
'type' => TType::STRING,
),
+ 14 => array(
+ 'var' => 'hasoldabort',
+ 'type' => TType::BOOL,
+ ),
);
}
if (is_array($vals)) {
@@ -22668,6 +22676,9 @@ class CompactionInfoStruct {
if (isset($vals['errorMessage'])) {
$this->errorMessage = $vals['errorMessage'];
}
+ if (isset($vals['hasoldabort'])) {
+ $this->hasoldabort = $vals['hasoldabort'];
+ }
}
}
@@ -22781,6 +22792,13 @@ class CompactionInfoStruct {
$xfer += $input->skip($ftype);
}
break;
+ case 14:
+ if ($ftype == TType::BOOL) {
+ $xfer += $input->readBool($this->hasoldabort);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -22859,6 +22877,11 @@ class CompactionInfoStruct {
$xfer += $output->writeString($this->errorMessage);
$xfer += $output->writeFieldEnd();
}
+ if ($this->hasoldabort !== null) {
+ $xfer += $output->writeFieldBegin('hasoldabort', TType::BOOL, 14);
+ $xfer += $output->writeBool($this->hasoldabort);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 1a0fee3..28c971f 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -15685,6 +15685,7 @@ class CompactionInfoStruct:
- start
- highestWriteId
- errorMessage
+ - hasoldabort
"""
thrift_spec = (
@@ -15702,9 +15703,10 @@ class CompactionInfoStruct:
(11, TType.I64, 'start', None, None, ), # 11
(12, TType.I64, 'highestWriteId', None, None, ), # 12
(13, TType.STRING, 'errorMessage', None, None, ), # 13
+ (14, TType.BOOL, 'hasoldabort', None, None, ), # 14
)
- def __init__(self, id=None, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, toomanyaborts=None, state=None, workerId=None, start=None, highestWriteId=None, errorMessage=None,):
+ def __init__(self, id=None, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, toomanyaborts=None, state=None, workerId=None, start=None, highestWriteId=None, errorMessage=None, hasoldabort=None,):
self.id = id
self.dbname = dbname
self.tablename = tablename
@@ -15718,6 +15720,7 @@ class CompactionInfoStruct:
self.start = start
self.highestWriteId = highestWriteId
self.errorMessage = errorMessage
+ self.hasoldabort = hasoldabort
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -15793,6 +15796,11 @@ class CompactionInfoStruct:
self.errorMessage = iprot.readString()
else:
iprot.skip(ftype)
+ elif fid == 14:
+ if ftype == TType.BOOL:
+ self.hasoldabort = iprot.readBool()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -15855,6 +15863,10 @@ class CompactionInfoStruct:
oprot.writeFieldBegin('errorMessage', TType.STRING, 13)
oprot.writeString(self.errorMessage)
oprot.writeFieldEnd()
+ if self.hasoldabort is not None:
+ oprot.writeFieldBegin('hasoldabort', TType.BOOL, 14)
+ oprot.writeBool(self.hasoldabort)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -15885,6 +15897,7 @@ class CompactionInfoStruct:
value = (value * 31) ^ hash(self.start)
value = (value * 31) ^ hash(self.highestWriteId)
value = (value * 31) ^ hash(self.errorMessage)
+ value = (value * 31) ^ hash(self.hasoldabort)
return value
def __repr__(self):
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index e6224ec..cdf97fc 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -3494,6 +3494,7 @@ class CompactionInfoStruct
START = 11
HIGHESTWRITEID = 12
ERRORMESSAGE = 13
+ HASOLDABORT = 14
FIELDS = {
ID => {:type => ::Thrift::Types::I64, :name => 'id'},
@@ -3508,7 +3509,8 @@ class CompactionInfoStruct
WORKERID => {:type => ::Thrift::Types::STRING, :name => 'workerId', :optional => true},
START => {:type => ::Thrift::Types::I64, :name => 'start', :optional => true},
HIGHESTWRITEID => {:type => ::Thrift::Types::I64, :name => 'highestWriteId', :optional => true},
- ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name => 'errorMessage', :optional => true}
+ ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name => 'errorMessage', :optional => true},
+ HASOLDABORT => {:type => ::Thrift::Types::BOOL, :name => 'hasoldabort', :optional => true}
}
def struct_fields; FIELDS; end
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index 8462b3d..c78aeb4 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1167,6 +1167,7 @@ struct CompactionInfoStruct {
11: optional i64 start
12: optional i64 highestWriteId
13: optional string errorMessage
+ 14: optional bool hasoldabort
}
struct OptionalCompactionInfoStruct {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index 70d63ab..062a97c 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -50,6 +50,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
public String runAs;
public String properties;
public boolean tooManyAborts = false;
+ public boolean hasOldAbort = false;
/**
* The highest write id that the compaction job will pay attention to.
* {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL)
@@ -118,6 +119,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
"properties:" + properties + "," +
"runAs:" + runAs + "," +
"tooManyAborts:" + tooManyAborts + "," +
+ "hasOldAbort:" + hasOldAbort + "," +
"highestWriteId:" + highestWriteId + "," +
"errorMessage:" + errorMessage;
}
@@ -193,6 +195,9 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
if (cr.isSetToomanyaborts()) {
ci.tooManyAborts = cr.isToomanyaborts();
}
+ if (cr.isSetHasoldabort()) {
+ ci.hasOldAbort = cr.isHasoldabort();
+ }
if (cr.isSetState() && cr.getState().length() != 1) {
throw new IllegalStateException("State should only be one character but it was set to " + cr.getState());
} else if (cr.isSetState()) {
@@ -220,6 +225,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
cr.setRunas(ci.runAs);
cr.setProperties(ci.properties);
cr.setToomanyaborts(ci.tooManyAborts);
+ cr.setHasoldabort(ci.hasOldAbort);
cr.setStart(ci.start);
cr.setState(Character.toString(ci.state));
cr.setWorkerId(ci.workerId);
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index a1bc109..d59f863 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.metastore.txn;
import org.apache.hadoop.hive.common.classification.RetrySemantics;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.util.StringUtils;
@@ -59,13 +58,15 @@ class CompactionTxnHandler extends TxnHandler {
*/
@Override
@RetrySemantics.ReadOnly
- public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold) throws MetaException {
- return findPotentialCompactions(abortedThreshold, -1);
+ public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long abortedTimeThreshold)
+ throws MetaException {
+ return findPotentialCompactions(abortedThreshold, abortedTimeThreshold, -1);
}
@Override
@RetrySemantics.ReadOnly
- public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long checkInterval) throws MetaException {
+ public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold,
+ long abortedTimeThreshold, long checkInterval) throws MetaException {
Connection dbConn = null;
Set<CompactionInfo> response = new HashSet<>();
Statement stmt = null;
@@ -75,7 +76,8 @@ class CompactionTxnHandler extends TxnHandler {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
// Check for completed transactions
- String s = "SELECT DISTINCT \"TC\".\"CTC_DATABASE\", \"TC\".\"CTC_TABLE\", \"TC\".\"CTC_PARTITION\" " +
+ final String s = "SELECT DISTINCT \"TC\".\"CTC_DATABASE\", \"TC\".\"CTC_TABLE\", \"TC\"" +
+ ".\"CTC_PARTITION\" " +
"FROM \"COMPLETED_TXN_COMPONENTS\" \"TC\" " + (checkInterval > 0 ?
"LEFT JOIN ( " +
" SELECT \"C1\".* FROM \"COMPLETED_COMPACTIONS\" \"C1\" " +
@@ -101,38 +103,51 @@ class CompactionTxnHandler extends TxnHandler {
}
rs.close();
- // Check for aborted txns
- s = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " +
- "FROM \"TXNS\", \"TXN_COMPONENTS\" " +
- "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' " +
- "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " +
- "HAVING COUNT(*) > " + abortedThreshold;
+ // Check for aborted txns: number of aborted txns past threshold and age of aborted txns
+ // past time threshold
+ boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0;
+ final String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\","
+ + "MIN(\"TXN_STARTED\"), COUNT(*)"
+ + "FROM \"TXNS\", \"TXN_COMPONENTS\" "
+ + "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' "
+ + "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\""
+ + (checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold);
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
+ LOG.debug("Going to execute query <" + sCheckAborted + ">");
+ rs = stmt.executeQuery(sCheckAborted);
+ long systemTime = System.currentTimeMillis();
while (rs.next()) {
- CompactionInfo info = new CompactionInfo();
- info.dbname = rs.getString(1);
- info.tableName = rs.getString(2);
- info.partName = rs.getString(3);
- info.tooManyAborts = true;
- response.add(info);
+ boolean pastTimeThreshold =
+ checkAbortedTimeThreshold && rs.getLong(4) + abortedTimeThreshold < systemTime;
+ int numAbortedTxns = rs.getInt(5);
+ if (numAbortedTxns > abortedThreshold || pastTimeThreshold) {
+ CompactionInfo info = new CompactionInfo();
+ info.dbname = rs.getString(1);
+ info.tableName = rs.getString(2);
+ info.partName = rs.getString(3);
+ info.tooManyAborts = numAbortedTxns > abortedThreshold;
+ info.hasOldAbort = pastTimeThreshold;
+ response.add(info);
+ }
}
LOG.debug("Going to rollback");
dbConn.rollback();
} catch (SQLException e) {
LOG.error("Unable to connect to transaction database " + e.getMessage());
- checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + abortedThreshold + ")");
+ checkRetryable(dbConn, e,
+ "findPotentialCompactions(maxAborted:" + abortedThreshold
+ + ", abortedTimeThreshold:" + abortedTimeThreshold + ")");
} finally {
close(rs, stmt, dbConn);
}
return response;
}
catch (RetryException e) {
- return findPotentialCompactions(abortedThreshold, checkInterval);
+ return findPotentialCompactions(abortedThreshold, abortedTimeThreshold, checkInterval);
}
}
+
/**
* This will grab the next compaction request off of
* the queue, and assign it to the worker.
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index e8ac71a..28f22e6 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.classification.RetrySemantics;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
@@ -31,7 +30,6 @@ import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
/**
@@ -315,14 +313,17 @@ public interface TxnStore extends Configurable {
* that may be ready for compaction. Also, look through txns and txn_components tables for
* aborted transactions that we should add to the list.
* @param abortedThreshold number of aborted queries forming a potential compaction request.
+ * @param abortedTimeThreshold age of an aborted txn in milliseconds that will trigger a
+ * potential compaction request.
* @return list of CompactionInfo structs. These will not have id, type,
* or runAs set since these are only potential compactions not actual ones.
*/
@RetrySemantics.ReadOnly
- Set<CompactionInfo> findPotentialCompactions(int abortedThreshold) throws MetaException;
+ Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long abortedTimeThreshold) throws MetaException;
@RetrySemantics.ReadOnly
- Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long checkInterval) throws MetaException;
+ Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long abortedTimeThreshold, long checkInterval)
+ throws MetaException;
/**
* This updates COMPACTION_QUEUE. Set runAs username for the case where the request was