You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kl...@apache.org on 2022/03/10 11:06:39 UTC
[hive] branch master updated: HIVE-25943: Introduce compaction cleaner failed attempts threshold (Laszlo Vegh, reviewed by Karen Coppage and Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 800c8e3 HIVE-25943: Introduce compaction cleaner failed attempts threshold (Laszlo Vegh, reviewed by Karen Coppage and Denys Kuzmenko)
800c8e3 is described below
commit 800c8e3782fc9f3a14727ceb7b5817f0fe1413cd
Author: veghlaci05 <90...@users.noreply.github.com>
AuthorDate: Thu Mar 10 12:06:23 2022 +0100
HIVE-25943: Introduce compaction cleaner failed attempts threshold (Laszlo Vegh, reviewed by Karen Coppage and Denys Kuzmenko)
Closes #3034.
---
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 50 +++++---
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 142 +++++++++++++++++++++
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 22 ++++
.../src/gen/thrift/gen-cpp/hive_metastore_types.h | 12 +-
.../hive/metastore/api/CompactionInfoStruct.java | 106 ++++++++++++++-
.../gen-php/metastore/CompactionInfoStruct.php | 24 ++++
.../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 14 +-
.../src/gen/thrift/gen-rb/hive_metastore_types.rb | 4 +-
.../hadoop/hive/metastore/conf/MetastoreConf.java | 8 ++
.../src/main/thrift/hive_metastore.thrift | 1 +
.../hadoop/hive/metastore/txn/CompactionInfo.java | 28 +++-
.../hive/metastore/txn/CompactionTxnHandler.java | 54 ++++++--
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 9 ++
.../hadoop/hive/metastore/utils/StringableMap.java | 4 +
.../src/main/sql/derby/hive-schema-4.0.0.derby.sql | 3 +-
.../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql | 3 +
.../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql | 1 +
.../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql | 3 +
.../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql | 3 +-
.../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql | 3 +
.../main/sql/oracle/hive-schema-4.0.0.oracle.sql | 3 +-
.../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql | 3 +
.../sql/postgres/hive-schema-4.0.0.postgres.sql | 3 +-
.../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql | 3 +
.../upgrade-3.1.3000-to-4.0.0.postgres.sql | 3 +
25 files changed, 470 insertions(+), 39 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 353c23a..faef9b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -56,7 +56,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
-import org.apache.hadoop.hive.common.StringableMap;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -86,12 +85,17 @@ import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_CLEANER_THREAD_NAM
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getIntVar;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
/**
* A class to clean directories after compactions. This will run in a separate thread.
*/
public class Cleaner extends MetaStoreCompactorThread {
+
static final private String CLASS_NAME = Cleaner.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
private long cleanerCheckInterval = 0;
@@ -120,11 +124,9 @@ public class Cleaner extends MetaStoreCompactorThread {
do {
TxnStore.MutexAPI.LockHandle handle = null;
long startedAt = -1;
- boolean delayedCleanupEnabled = HiveConf.getBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED);
- long retentionTime = 0;
- if (delayedCleanupEnabled) {
- retentionTime = HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS);
- }
+ long retentionTime = HiveConf.getBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
+ ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS)
+ : 0;
// Make sure nothing escapes this run method and kills the metastore at large,
// so wrap it in a big catch Throwable statement.
@@ -208,14 +210,13 @@ public class Cleaner extends MetaStoreCompactorThread {
if (metricsEnabled) {
perfLogger.perfLogBegin(CLASS_NAME, cleanerMetric);
}
- Optional<String> location = Optional.ofNullable(ci.properties).map(StringableMap::new)
- .map(config -> config.get("location"));
+ final String location = ci.getProperty("location");
Callable<Boolean> cleanUpTask;
Table t = null;
Partition p = null;
- if (!location.isPresent()) {
+ if (location == null) {
t = resolveTable(ci);
if (t == null) {
// The table was dropped before we got around to cleaning it.
@@ -250,12 +251,13 @@ public class Cleaner extends MetaStoreCompactorThread {
txnHandler.markCleanerStart(ci);
if (t != null || ci.partName != null) {
- Table finalT = t; Partition finalP = p;
- String path = location.orElseGet(() -> resolveStorageDescriptor(finalT, finalP).getLocation());
+ String path = location == null
+ ? resolveStorageDescriptor(t, p).getLocation()
+ : location;
boolean dropPartition = ci.partName != null && p == null;
cleanUpTask = () -> removeFiles(path, minOpenTxnGLB, ci, dropPartition);
} else {
- cleanUpTask = () -> removeFiles(location.get(), ci);
+ cleanUpTask = () -> removeFiles(location, ci);
}
Ref<Boolean> removedFiles = Ref.from(false);
@@ -289,14 +291,30 @@ public class Cleaner extends MetaStoreCompactorThread {
if (metricsEnabled) {
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
}
- txnHandler.markFailed(ci);
- } finally {
+ handleCleanerAttemptFailure(ci);
+ } finally {
if (metricsEnabled) {
perfLogger.perfLogEnd(CLASS_NAME, cleanerMetric);
}
}
}
+ private void handleCleanerAttemptFailure(CompactionInfo ci) throws MetaException {
+ long defaultRetention = getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS);
+ int cleanAttempts = 0;
+ if (ci.retryRetention > 0) {
+ cleanAttempts = (int)(Math.log(ci.retryRetention / defaultRetention) / Math.log(2)) + 1;
+ }
+ if (cleanAttempts >= getIntVar(conf, HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
+ //Mark it as failed if the max attempt threshold is reached.
+ txnHandler.markFailed(ci);
+ } else {
+ //Calculate retry retention time and update record.
+ ci.retryRetention = (long)Math.pow(2, cleanAttempts) * defaultRetention;
+ txnHandler.setCleanerRetryRetentionTimeOnError(ci);
+ }
+ }
+
private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList)
throws NoSuchTxnException, MetaException {
List<String> tblNames = Collections.singletonList(AcidUtils.getFullTableName(ci.dbname, ci.tableName));
@@ -485,8 +503,8 @@ public class Cleaner extends MetaStoreCompactorThread {
Path path = new Path(location);
StringBuilder extraDebugInfo = new StringBuilder("[").append(path.getName()).append(",");
- boolean ifPurge = Optional.ofNullable(ci.properties).map(StringableMap::new)
- .map(config -> config.get("ifPurge")).map(Boolean::valueOf).orElse(true);
+ String strIfPurge = ci.getProperty("ifPurge");
+ boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge"));
return remove(location, ci, Collections.singletonList(path), ifPurge,
path.getFileSystem(conf), extraDebugInfo);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 2bb8ace..ec5ca39 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.internal.util.reflection.FieldSetter;
import java.util.ArrayList;
import java.util.Arrays;
@@ -50,9 +51,18 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
+import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.nullable;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
/**
* Tests for the compactor Cleaner thread
@@ -67,6 +77,138 @@ public class TestCleaner extends CompactorTest {
}
@Test
+ public void testRetryAfterFailedCleanupDelayEnabled() throws Exception {
+ testRetryAfterFailedCleanup(true);
+ }
+
+ @Test
+ public void testRetryAfterFailedCleanupDelayDisabled() throws Exception {
+ testRetryAfterFailedCleanup(false);
+ }
+
+ public void testRetryAfterFailedCleanup(boolean delayEnabled) throws Exception {
+ conf.setBoolVar(HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED, delayEnabled);
+ conf.setTimeVar(HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 2, TimeUnit.SECONDS);
+ MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 3);
+ MetastoreConf.setTimeVar(conf, MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, 100, TimeUnit.MILLISECONDS);
+ String errorMessage = "Тут немає прибирання, сер!";
+
+ Table t = newTable("default", "retry_test", false);
+
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ burnThroughTransactions("default", "retry_test", 25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "retry_test", CompactionType.MAJOR);
+ long compactTxn = compactInTxn(rqst);
+ addBaseFile(t, null, 25L, 25, compactTxn);
+
+ //Prevent cleaner from marking the compaction as cleaned
+ TxnStore mockedHandler = spy(txnHandler);
+ doThrow(new RuntimeException(errorMessage)).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
+ for (int i = 1; i < 4; i++) {
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
+
+ cleaner.run();
+
+ // Sleep 100ms longer than the actual retention to make sure the compaciton will be picked up again by the cleaner
+ long sleep =
+ (delayEnabled ? conf.getTimeVar(HIVE_COMPACTOR_CLEANER_RETENTION_TIME, TimeUnit.MILLISECONDS) : 0) + //delayed start retention time
+ (getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS) * (long)Math.pow(2, i)) + //retry retention time
+ 100;
+ Thread.sleep(sleep);
+
+ // Check retry attempts updated
+ List<CompactionInfo> compcationInfos = txnHandler.findReadyToClean(0, 0);
+ Assert.assertEquals(String.format("Expected %d CompactionInfo, but got %d", 1, compcationInfos.size()), 1, compcationInfos.size());
+ CompactionInfo ci = compcationInfos.get(0);
+ int cleanAttempts = 0;
+ if (ci.retryRetention > 0) {
+ cleanAttempts = (int)(Math.log(ci.retryRetention / getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS)) / Math.log(2)) + 1;
+ }
+ Assert.assertEquals(String.format("Expected %d clean attempts, but got %d", i, cleanAttempts), i, cleanAttempts);
+
+ // Check state is still 'ready for cleaning'
+ ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(String.format("Expected %d CompactionInfo, but got %d", 1, scr.getCompactsSize()),
+ 1, scr.getCompactsSize());
+ ShowCompactResponseElement scre = scr.getCompacts().get(0);
+
+ Assert.assertEquals(String.format("Expected '%s' state, but got '%s'", "ready for cleaning", scre.getState()),
+ "ready for cleaning", scre.getState());
+ Assert.assertEquals(String.format("Expected error message: '%s', but got '%s'", errorMessage, scre.getErrorMessage()),
+ errorMessage, scre.getErrorMessage() );
+
+ }
+
+ //Do a final run to reach the maximum retry attempts, so the state finally should be set to failed
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
+
+ cleaner.run();
+
+ ShowCompactResponse scr = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(String.format("Expected %d CompactionInfo, but got %d", 1, scr.getCompactsSize()),
+ 1, scr.getCompactsSize());
+ ShowCompactResponseElement scre = scr.getCompacts().get(0);
+ Assert.assertEquals(String.format("Expected '%s' state, but got '%s'", "failed", scre.getState()),
+ "failed", scre.getState());
+ Assert.assertEquals(String.format("Expected error message: '%s', but got '%s'", errorMessage, scre.getErrorMessage()),
+ errorMessage, scre.getErrorMessage() );
+ }
+
+ @Test
+ public void testRetentionAfterFailedCleanup() throws Exception {
+ conf.setBoolVar(HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED, false);
+
+ Table t = newTable("default", "retry_test", false);
+
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ burnThroughTransactions("default", "retry_test", 25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "retry_test", CompactionType.MAJOR);
+ long compactTxn = compactInTxn(rqst);
+ addBaseFile(t, null, 25L, 25, compactTxn);
+
+ //Prevent cleaner from marking the compaction as cleaned
+ TxnStore mockedHandler = spy(txnHandler);
+ doThrow(new RuntimeException()).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
+
+ //Do a run to fail the clean and set the retention time
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
+
+ cleaner.run();
+
+ AtomicReference<List<CompactionInfo>> reference = new AtomicReference<>();
+ doAnswer(invocation -> {
+ Object o = invocation.callRealMethod();
+ reference.set((List<CompactionInfo>) o);
+ return o;
+ }).when(mockedHandler).findReadyToClean(anyLong(), anyLong());
+
+ //Do a final run and check if the compaction is not picked up again
+ cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
+
+ cleaner.run();
+
+ Assert.assertEquals(0, reference.get().size());
+ }
+
+ @Test
public void cleanupAfterMajorTableCompaction() throws Exception {
Table t = newTable("default", "camtc", false);
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index bca4d2c..3952126 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -26921,6 +26921,11 @@ void CompactionInfoStruct::__set_enqueueTime(const int64_t val) {
this->enqueueTime = val;
__isset.enqueueTime = true;
}
+
+void CompactionInfoStruct::__set_retryRetention(const int64_t val) {
+ this->retryRetention = val;
+__isset.retryRetention = true;
+}
std::ostream& operator<<(std::ostream& out, const CompactionInfoStruct& obj)
{
obj.printTo(out);
@@ -27075,6 +27080,14 @@ uint32_t CompactionInfoStruct::read(::apache::thrift::protocol::TProtocol* iprot
xfer += iprot->skip(ftype);
}
break;
+ case 16:
+ if (ftype == ::apache::thrift::protocol::T_I64) {
+ xfer += iprot->readI64(this->retryRetention);
+ this->__isset.retryRetention = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -27171,6 +27184,11 @@ uint32_t CompactionInfoStruct::write(::apache::thrift::protocol::TProtocol* opro
xfer += oprot->writeI64(this->enqueueTime);
xfer += oprot->writeFieldEnd();
}
+ if (this->__isset.retryRetention) {
+ xfer += oprot->writeFieldBegin("retryRetention", ::apache::thrift::protocol::T_I64, 16);
+ xfer += oprot->writeI64(this->retryRetention);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -27193,6 +27211,7 @@ void swap(CompactionInfoStruct &a, CompactionInfoStruct &b) {
swap(a.errorMessage, b.errorMessage);
swap(a.hasoldabort, b.hasoldabort);
swap(a.enqueueTime, b.enqueueTime);
+ swap(a.retryRetention, b.retryRetention);
swap(a.__isset, b.__isset);
}
@@ -27212,6 +27231,7 @@ CompactionInfoStruct::CompactionInfoStruct(const CompactionInfoStruct& other981)
errorMessage = other981.errorMessage;
hasoldabort = other981.hasoldabort;
enqueueTime = other981.enqueueTime;
+ retryRetention = other981.retryRetention;
__isset = other981.__isset;
}
CompactionInfoStruct& CompactionInfoStruct::operator=(const CompactionInfoStruct& other982) {
@@ -27230,6 +27250,7 @@ CompactionInfoStruct& CompactionInfoStruct::operator=(const CompactionInfoStruct
errorMessage = other982.errorMessage;
hasoldabort = other982.hasoldabort;
enqueueTime = other982.enqueueTime;
+ retryRetention = other982.retryRetention;
__isset = other982.__isset;
return *this;
}
@@ -27251,6 +27272,7 @@ void CompactionInfoStruct::printTo(std::ostream& out) const {
out << ", " << "errorMessage="; (__isset.errorMessage ? (out << to_string(errorMessage)) : (out << "<null>"));
out << ", " << "hasoldabort="; (__isset.hasoldabort ? (out << to_string(hasoldabort)) : (out << "<null>"));
out << ", " << "enqueueTime="; (__isset.enqueueTime ? (out << to_string(enqueueTime)) : (out << "<null>"));
+ out << ", " << "retryRetention="; (__isset.retryRetention ? (out << to_string(retryRetention)) : (out << "<null>"));
out << ")";
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index fb9f87f..b1bf603 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -10175,7 +10175,7 @@ void swap(CompactionRequest &a, CompactionRequest &b);
std::ostream& operator<<(std::ostream& out, const CompactionRequest& obj);
typedef struct _CompactionInfoStruct__isset {
- _CompactionInfoStruct__isset() : partitionname(false), runas(false), properties(false), toomanyaborts(false), state(false), workerId(false), start(false), highestWriteId(false), errorMessage(false), hasoldabort(false), enqueueTime(false) {}
+ _CompactionInfoStruct__isset() : partitionname(false), runas(false), properties(false), toomanyaborts(false), state(false), workerId(false), start(false), highestWriteId(false), errorMessage(false), hasoldabort(false), enqueueTime(false), retryRetention(false) {}
bool partitionname :1;
bool runas :1;
bool properties :1;
@@ -10187,6 +10187,7 @@ typedef struct _CompactionInfoStruct__isset {
bool errorMessage :1;
bool hasoldabort :1;
bool enqueueTime :1;
+ bool retryRetention :1;
} _CompactionInfoStruct__isset;
class CompactionInfoStruct : public virtual ::apache::thrift::TBase {
@@ -10194,7 +10195,7 @@ class CompactionInfoStruct : public virtual ::apache::thrift::TBase {
CompactionInfoStruct(const CompactionInfoStruct&);
CompactionInfoStruct& operator=(const CompactionInfoStruct&);
- CompactionInfoStruct() : id(0), dbname(), tablename(), partitionname(), type((CompactionType::type)0), runas(), properties(), toomanyaborts(0), state(), workerId(), start(0), highestWriteId(0), errorMessage(), hasoldabort(0), enqueueTime(0) {
+ CompactionInfoStruct() : id(0), dbname(), tablename(), partitionname(), type((CompactionType::type)0), runas(), properties(), toomanyaborts(0), state(), workerId(), start(0), highestWriteId(0), errorMessage(), hasoldabort(0), enqueueTime(0), retryRetention(0) {
}
virtual ~CompactionInfoStruct() noexcept;
@@ -10217,6 +10218,7 @@ class CompactionInfoStruct : public virtual ::apache::thrift::TBase {
std::string errorMessage;
bool hasoldabort;
int64_t enqueueTime;
+ int64_t retryRetention;
_CompactionInfoStruct__isset __isset;
@@ -10250,6 +10252,8 @@ class CompactionInfoStruct : public virtual ::apache::thrift::TBase {
void __set_enqueueTime(const int64_t val);
+ void __set_retryRetention(const int64_t val);
+
bool operator == (const CompactionInfoStruct & rhs) const
{
if (!(id == rhs.id))
@@ -10304,6 +10308,10 @@ class CompactionInfoStruct : public virtual ::apache::thrift::TBase {
return false;
else if (__isset.enqueueTime && !(enqueueTime == rhs.enqueueTime))
return false;
+ if (__isset.retryRetention != rhs.__isset.retryRetention)
+ return false;
+ else if (__isset.retryRetention && !(retryRetention == rhs.retryRetention))
+ return false;
return true;
}
bool operator != (const CompactionInfoStruct &rhs) const {
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java
index b8a1753..229badf 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java
@@ -26,6 +26,7 @@ package org.apache.hadoop.hive.metastore.api;
private static final org.apache.thrift.protocol.TField ERROR_MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("errorMessage", org.apache.thrift.protocol.TType.STRING, (short)13);
private static final org.apache.thrift.protocol.TField HASOLDABORT_FIELD_DESC = new org.apache.thrift.protocol.TField("hasoldabort", org.apache.thrift.protocol.TType.BOOL, (short)14);
private static final org.apache.thrift.protocol.TField ENQUEUE_TIME_FIELD_DESC = new org.apache.thrift.protocol.TField("enqueueTime", org.apache.thrift.protocol.TType.I64, (short)15);
+ private static final org.apache.thrift.protocol.TField RETRY_RETENTION_FIELD_DESC = new org.apache.thrift.protocol.TField("retryRetention", org.apache.thrift.protocol.TType.I64, (short)16);
private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new CompactionInfoStructStandardSchemeFactory();
private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new CompactionInfoStructTupleSchemeFactory();
@@ -45,6 +46,7 @@ package org.apache.hadoop.hive.metastore.api;
private @org.apache.thrift.annotation.Nullable java.lang.String errorMessage; // optional
private boolean hasoldabort; // optional
private long enqueueTime; // optional
+ private long retryRetention; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -66,7 +68,8 @@ package org.apache.hadoop.hive.metastore.api;
HIGHEST_WRITE_ID((short)12, "highestWriteId"),
ERROR_MESSAGE((short)13, "errorMessage"),
HASOLDABORT((short)14, "hasoldabort"),
- ENQUEUE_TIME((short)15, "enqueueTime");
+ ENQUEUE_TIME((short)15, "enqueueTime"),
+ RETRY_RETENTION((short)16, "retryRetention");
private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
@@ -112,6 +115,8 @@ package org.apache.hadoop.hive.metastore.api;
return HASOLDABORT;
case 15: // ENQUEUE_TIME
return ENQUEUE_TIME;
+ case 16: // RETRY_RETENTION
+ return RETRY_RETENTION;
default:
return null;
}
@@ -159,8 +164,9 @@ package org.apache.hadoop.hive.metastore.api;
private static final int __HIGHESTWRITEID_ISSET_ID = 3;
private static final int __HASOLDABORT_ISSET_ID = 4;
private static final int __ENQUEUETIME_ISSET_ID = 5;
+ private static final int __RETRYRETENTION_ISSET_ID = 6;
private byte __isset_bitfield = 0;
- private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.TOOMANYABORTS,_Fields.STATE,_Fields.WORKER_ID,_Fields.START,_Fields.HIGHEST_WRITE_ID,_Fields.ERROR_MESSAGE,_Fields.HASOLDABORT,_Fields.ENQUEUE_TIME};
+ private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.TOOMANYABORTS,_Fields.STATE,_Fields.WORKER_ID,_Fields.START,_Fields.HIGHEST_WRITE_ID,_Fields.ERROR_MESSAGE,_Fields.HASOLDABORT,_Fields.ENQUEUE_TIME,_Fields.RETRY_RETENTION};
public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -194,6 +200,8 @@ package org.apache.hadoop.hive.metastore.api;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
tmpMap.put(_Fields.ENQUEUE_TIME, new org.apache.thrift.meta_data.FieldMetaData("enqueueTime", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+ tmpMap.put(_Fields.RETRY_RETENTION, new org.apache.thrift.meta_data.FieldMetaData("retryRetention", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionInfoStruct.class, metaDataMap);
}
@@ -253,6 +261,7 @@ package org.apache.hadoop.hive.metastore.api;
}
this.hasoldabort = other.hasoldabort;
this.enqueueTime = other.enqueueTime;
+ this.retryRetention = other.retryRetention;
}
public CompactionInfoStruct deepCopy() {
@@ -282,6 +291,8 @@ package org.apache.hadoop.hive.metastore.api;
this.hasoldabort = false;
setEnqueueTimeIsSet(false);
this.enqueueTime = 0;
+ setRetryRetentionIsSet(false);
+ this.retryRetention = 0;
}
public long getId() {
@@ -640,6 +651,28 @@ package org.apache.hadoop.hive.metastore.api;
__isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __ENQUEUETIME_ISSET_ID, value);
}
+ public long getRetryRetention() {
+ return this.retryRetention;
+ }
+
+ public void setRetryRetention(long retryRetention) {
+ this.retryRetention = retryRetention;
+ setRetryRetentionIsSet(true);
+ }
+
+ public void unsetRetryRetention() {
+ __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __RETRYRETENTION_ISSET_ID);
+ }
+
+ /** Returns true if field retryRetention is set (has been assigned a value) and false otherwise */
+ public boolean isSetRetryRetention() {
+ return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __RETRYRETENTION_ISSET_ID);
+ }
+
+ public void setRetryRetentionIsSet(boolean value) {
+ __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __RETRYRETENTION_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
switch (field) {
case ID:
@@ -762,6 +795,14 @@ package org.apache.hadoop.hive.metastore.api;
}
break;
+ case RETRY_RETENTION:
+ if (value == null) {
+ unsetRetryRetention();
+ } else {
+ setRetryRetention((java.lang.Long)value);
+ }
+ break;
+
}
}
@@ -813,6 +854,9 @@ package org.apache.hadoop.hive.metastore.api;
case ENQUEUE_TIME:
return getEnqueueTime();
+ case RETRY_RETENTION:
+ return getRetryRetention();
+
}
throw new java.lang.IllegalStateException();
}
@@ -854,6 +898,8 @@ package org.apache.hadoop.hive.metastore.api;
return isSetHasoldabort();
case ENQUEUE_TIME:
return isSetEnqueueTime();
+ case RETRY_RETENTION:
+ return isSetRetryRetention();
}
throw new java.lang.IllegalStateException();
}
@@ -1006,6 +1052,15 @@ package org.apache.hadoop.hive.metastore.api;
return false;
}
+ boolean this_present_retryRetention = true && this.isSetRetryRetention();
+ boolean that_present_retryRetention = true && that.isSetRetryRetention();
+ if (this_present_retryRetention || that_present_retryRetention) {
+ if (!(this_present_retryRetention && that_present_retryRetention))
+ return false;
+ if (this.retryRetention != that.retryRetention)
+ return false;
+ }
+
return true;
}
@@ -1071,6 +1126,10 @@ package org.apache.hadoop.hive.metastore.api;
if (isSetEnqueueTime())
hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(enqueueTime);
+ hashCode = hashCode * 8191 + ((isSetRetryRetention()) ? 131071 : 524287);
+ if (isSetRetryRetention())
+ hashCode = hashCode * 8191 + org.apache.thrift.TBaseHelper.hashCode(retryRetention);
+
return hashCode;
}
@@ -1232,6 +1291,16 @@ package org.apache.hadoop.hive.metastore.api;
return lastComparison;
}
}
+ lastComparison = java.lang.Boolean.compare(isSetRetryRetention(), other.isSetRetryRetention());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetRetryRetention()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.retryRetention, other.retryRetention);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -1370,6 +1439,12 @@ package org.apache.hadoop.hive.metastore.api;
sb.append(this.enqueueTime);
first = false;
}
+ if (isSetRetryRetention()) {
+ if (!first) sb.append(", ");
+ sb.append("retryRetention:");
+ sb.append(this.retryRetention);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -1551,6 +1626,14 @@ package org.apache.hadoop.hive.metastore.api;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 16: // RETRY_RETENTION
+ if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+ struct.retryRetention = iprot.readI64();
+ struct.setRetryRetentionIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -1649,6 +1732,11 @@ package org.apache.hadoop.hive.metastore.api;
oprot.writeI64(struct.enqueueTime);
oprot.writeFieldEnd();
}
+ if (struct.isSetRetryRetention()) {
+ oprot.writeFieldBegin(RETRY_RETENTION_FIELD_DESC);
+ oprot.writeI64(struct.retryRetention);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -1704,7 +1792,10 @@ package org.apache.hadoop.hive.metastore.api;
if (struct.isSetEnqueueTime()) {
optionals.set(10);
}
- oprot.writeBitSet(optionals, 11);
+ if (struct.isSetRetryRetention()) {
+ optionals.set(11);
+ }
+ oprot.writeBitSet(optionals, 12);
if (struct.isSetPartitionname()) {
oprot.writeString(struct.partitionname);
}
@@ -1738,6 +1829,9 @@ package org.apache.hadoop.hive.metastore.api;
if (struct.isSetEnqueueTime()) {
oprot.writeI64(struct.enqueueTime);
}
+ if (struct.isSetRetryRetention()) {
+ oprot.writeI64(struct.retryRetention);
+ }
}
@Override
@@ -1751,7 +1845,7 @@ package org.apache.hadoop.hive.metastore.api;
struct.setTablenameIsSet(true);
struct.type = org.apache.hadoop.hive.metastore.api.CompactionType.findByValue(iprot.readI32());
struct.setTypeIsSet(true);
- java.util.BitSet incoming = iprot.readBitSet(11);
+ java.util.BitSet incoming = iprot.readBitSet(12);
if (incoming.get(0)) {
struct.partitionname = iprot.readString();
struct.setPartitionnameIsSet(true);
@@ -1796,6 +1890,10 @@ package org.apache.hadoop.hive.metastore.api;
struct.enqueueTime = iprot.readI64();
struct.setEnqueueTimeIsSet(true);
}
+ if (incoming.get(11)) {
+ struct.retryRetention = iprot.readI64();
+ struct.setRetryRetentionIsSet(true);
+ }
}
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionInfoStruct.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionInfoStruct.php
index 95fd83b..99343fe 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionInfoStruct.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionInfoStruct.php
@@ -97,6 +97,11 @@ class CompactionInfoStruct
'isRequired' => false,
'type' => TType::I64,
),
+ 16 => array(
+ 'var' => 'retryRetention',
+ 'isRequired' => false,
+ 'type' => TType::I64,
+ ),
);
/**
@@ -159,6 +164,10 @@ class CompactionInfoStruct
* @var int
*/
public $enqueueTime = null;
+ /**
+ * @var int
+ */
+ public $retryRetention = null;
public function __construct($vals = null)
{
@@ -208,6 +217,9 @@ class CompactionInfoStruct
if (isset($vals['enqueueTime'])) {
$this->enqueueTime = $vals['enqueueTime'];
}
+ if (isset($vals['retryRetention'])) {
+ $this->retryRetention = $vals['retryRetention'];
+ }
}
}
@@ -335,6 +347,13 @@ class CompactionInfoStruct
$xfer += $input->skip($ftype);
}
break;
+ case 16:
+ if ($ftype == TType::I64) {
+ $xfer += $input->readI64($this->retryRetention);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -424,6 +443,11 @@ class CompactionInfoStruct
$xfer += $output->writeI64($this->enqueueTime);
$xfer += $output->writeFieldEnd();
}
+ if ($this->retryRetention !== null) {
+ $xfer += $output->writeFieldBegin('retryRetention', TType::I64, 16);
+ $xfer += $output->writeI64($this->retryRetention);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index f10a30f..3eb49d5 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -15397,11 +15397,12 @@ class CompactionInfoStruct(object):
- errorMessage
- hasoldabort
- enqueueTime
+ - retryRetention
"""
- def __init__(self, id=None, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, toomanyaborts=None, state=None, workerId=None, start=None, highestWriteId=None, errorMessage=None, hasoldabort=None, enqueueTime=None,):
+ def __init__(self, id=None, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, toomanyaborts=None, state=None, workerId=None, start=None, highestWriteId=None, errorMessage=None, hasoldabort=None, enqueueTime=None, retryRetention=None,):
self.id = id
self.dbname = dbname
self.tablename = tablename
@@ -15417,6 +15418,7 @@ class CompactionInfoStruct(object):
self.errorMessage = errorMessage
self.hasoldabort = hasoldabort
self.enqueueTime = enqueueTime
+ self.retryRetention = retryRetention
def read(self, iprot):
if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -15502,6 +15504,11 @@ class CompactionInfoStruct(object):
self.enqueueTime = iprot.readI64()
else:
iprot.skip(ftype)
+ elif fid == 16:
+ if ftype == TType.I64:
+ self.retryRetention = iprot.readI64()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -15572,6 +15579,10 @@ class CompactionInfoStruct(object):
oprot.writeFieldBegin('enqueueTime', TType.I64, 15)
oprot.writeI64(self.enqueueTime)
oprot.writeFieldEnd()
+ if self.retryRetention is not None:
+ oprot.writeFieldBegin('retryRetention', TType.I64, 16)
+ oprot.writeI64(self.retryRetention)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -30752,6 +30763,7 @@ CompactionInfoStruct.thrift_spec = (
(13, TType.STRING, 'errorMessage', 'UTF8', None, ), # 13
(14, TType.BOOL, 'hasoldabort', None, None, ), # 14
(15, TType.I64, 'enqueueTime', None, None, ), # 15
+ (16, TType.I64, 'retryRetention', None, None, ), # 16
)
all_structs.append(OptionalCompactionInfoStruct)
OptionalCompactionInfoStruct.thrift_spec = (
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index dfd1afa..f4c52a7 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -4489,6 +4489,7 @@ class CompactionInfoStruct
ERRORMESSAGE = 13
HASOLDABORT = 14
ENQUEUETIME = 15
+ RETRYRETENTION = 16
FIELDS = {
ID => {:type => ::Thrift::Types::I64, :name => 'id'},
@@ -4505,7 +4506,8 @@ class CompactionInfoStruct
HIGHESTWRITEID => {:type => ::Thrift::Types::I64, :name => 'highestWriteId', :optional => true},
ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name => 'errorMessage', :optional => true},
HASOLDABORT => {:type => ::Thrift::Types::BOOL, :name => 'hasoldabort', :optional => true},
- ENQUEUETIME => {:type => ::Thrift::Types::I64, :name => 'enqueueTime', :optional => true}
+ ENQUEUETIME => {:type => ::Thrift::Types::I64, :name => 'enqueueTime', :optional => true},
+ RETRYRETENTION => {:type => ::Thrift::Types::I64, :name => 'retryRetention', :optional => true}
}
def struct_fields; FIELDS; end
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 07420eb..ca6e83d 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -613,6 +613,14 @@ public class MetastoreConf {
"metastore.compactor.enable.stats.compression",
"metastore.compactor.enable.stats.compression", true,
"Can be used to disable compression and ORC indexes for files produced by minor compaction."),
+ HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS("hive.compactor.cleaner.retry.maxattempts",
+ "hive.compactor.cleaner.retry.maxattempts", 5, new RangeValidator(0, 10),
+ "Maximum number of attempts to clean a table again after a failed cycle. Must be between 0 and 10," +
+ "where 0 means the feature is turned off, the cleaner wont't retry after a failed cycle."),
+ HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME("hive.compactor.cleaner.retry.retention.time",
+ "hive.compactor.cleaner.retry.retentionTime", 300, TimeUnit.SECONDS, new TimeValidator(TimeUnit.SECONDS),
+ "Initial value of the cleaner retry retention time. The delay has a backoff, and calculated the following way: " +
+ "pow(2, number_of_failed_attempts) * HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME."),
CONNECTION_DRIVER("javax.jdo.option.ConnectionDriverName",
"javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver",
"Driver class name for a JDBC metastore"),
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index 7093587..a03e3c9 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1297,6 +1297,7 @@ struct CompactionInfoStruct {
13: optional string errorMessage
14: optional bool hasoldabort
15: optional i64 enqueueTime
+ 16: optional i64 retryRetention
}
struct OptionalCompactionInfoStruct {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index 85da714..7ac4238 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.OptionalCompactionInfoStruct;
import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
+import org.apache.hadoop.hive.metastore.utils.StringableMap;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -57,6 +58,8 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
public String properties;
public boolean tooManyAborts = false;
public boolean hasOldAbort = false;
+ public long retryRetention = 0;
+
/**
* The highest write id that the compaction job will pay attention to.
* {@code 0} means it wasn't set (e.g. in case of upgrades, since ResultSet.getLong() will return 0 if field is NULL)
@@ -73,6 +76,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
private String fullPartitionName = null;
private String fullTableName = null;
+ private StringableMap propertiesMap;
public CompactionInfo(String dbname, String tableName, String partName, CompactionType type) {
this.dbname = dbname;
@@ -87,6 +91,21 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
}
CompactionInfo() {}
+ public String getProperty(String key) {
+ if (propertiesMap == null) {
+ propertiesMap = new StringableMap(properties);
+ }
+ return propertiesMap.get(key);
+ }
+
+ public void setProperty(String key, String value) {
+ if (propertiesMap == null) {
+ propertiesMap = new StringableMap(properties);
+ }
+ propertiesMap.put(key, value);
+ properties = propertiesMap.toString();
+ }
+
public String getFullPartitionName() {
if (fullPartitionName == null) {
StringBuilder buf = new StringBuilder(dbname);
@@ -134,7 +153,8 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
"highestWriteId:" + highestWriteId + "," +
"errorMessage:" + errorMessage + "," +
"workerId: " + workerId + "," +
- "initiatorId: " + initiatorId;
+ "initiatorId: " + initiatorId + "," +
+ "retryRetention" + retryRetention;
}
@Override
@@ -181,6 +201,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
fullCi.workerVersion = rs.getString(16);
fullCi.initiatorId = rs.getString(17);
fullCi.initiatorVersion = rs.getString(18);
+ fullCi.retryRetention = rs.getLong(19);
return fullCi;
}
static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException, MetaException {
@@ -237,6 +258,9 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
if (cr.isSetEnqueueTime()) {
ci.enqueueTime = cr.getEnqueueTime();
}
+ if (cr.isSetRetryRetention()) {
+ ci.retryRetention = cr.getRetryRetention();
+ }
return ci;
}
@@ -256,7 +280,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> {
cr.setHighestWriteId(ci.highestWriteId);
cr.setErrorMessage(ci.errorMessage);
cr.setEnqueueTime(ci.enqueueTime);
-
+ cr.setRetryRetention(ci.retryRetention);
return cr;
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index e691664..439795a 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -59,7 +59,8 @@ class CompactionTxnHandler extends TxnHandler {
"SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "
+ "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", "
+ "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", "
- + "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_TXN_ID\" = ?";
+ + "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", "
+ + "\"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_TXN_ID\" = ?";
private static final String SELECT_COMPACTION_METRICS_CACHE_QUERY =
"SELECT \"CMC_METRIC_VALUE\", \"CMC_VERSION\" FROM \"COMPACTION_METRICS_CACHE\" " +
"WHERE \"CMC_DATABASE\" = ? AND \"CMC_TABLE\" = ? AND \"CMC_METRIC_TYPE\" = ?";
@@ -334,7 +335,6 @@ class CompactionTxnHandler extends TxnHandler {
* Find entries in the queue that are ready to
* be cleaned.
* @param minOpenTxnWaterMark Minimum open txnId
- * @param retentionTime time in milliseconds to delay cleanup after compaction
* @return information on the entry in the queue.
*/
@Override
@@ -353,11 +353,9 @@ class CompactionTxnHandler extends TxnHandler {
if (minOpenTxnWaterMark > 0) {
whereClause += " AND (\"CQ_NEXT_TXN_ID\" <= " + minOpenTxnWaterMark + " OR \"CQ_NEXT_TXN_ID\" IS NULL)";
}
- if (retentionTime > 0) {
- whereClause += " AND (\"CQ_COMMIT_TIME\" < (" + getEpochFn(dbProduct) + " - " + retentionTime + ") OR \"CQ_COMMIT_TIME\" IS NULL)";
- }
+ whereClause += " AND (\"CQ_COMMIT_TIME\" < (" + getEpochFn(dbProduct) + " - CQ_RETRY_RETENTION - " + retentionTime + ") OR \"CQ_COMMIT_TIME\" IS NULL)";
String s = "SELECT \"CQ_ID\", \"cq1\".\"CQ_DATABASE\", \"cq1\".\"CQ_TABLE\", \"cq1\".\"CQ_PARTITION\"," +
- " \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TBLPROPERTIES\"" +
+ " \"CQ_TYPE\", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_TBLPROPERTIES\", \"CQ_RETRY_RETENTION\" " +
" FROM \"COMPACTION_QUEUE\" \"cq1\" " +
"INNER JOIN (" +
" SELECT MIN(\"CQ_HIGHEST_WRITE_ID\") \"WRITE_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\"" +
@@ -384,8 +382,9 @@ class CompactionTxnHandler extends TxnHandler {
info.runAs = rs.getString(6);
info.highestWriteId = rs.getLong(7);
info.properties = rs.getString(8);
+ info.retryRetention = rs.getInt(9);
if (LOG.isDebugEnabled()) {
- LOG.debug("Found ready to clean: " + info.toString());
+ LOG.debug("Found ready to clean: " + info);
}
rc.add(info);
}
@@ -1339,8 +1338,8 @@ class CompactionTxnHandler extends TxnHandler {
pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "
+ "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", "
+ "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", "
- + "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\" "
- + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?");
+ + "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", "
+ + "\"CQ_RETRY_RETENTION\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?");
pStmt.setLong(1, ci.id);
rs = pStmt.executeQuery();
if (rs.next()) {
@@ -1430,6 +1429,43 @@ class CompactionTxnHandler extends TxnHandler {
updateStatus(info);
}
+
+ @Override
+ @RetrySemantics.CannotRetry
+ public void setCleanerRetryRetentionTimeOnError(CompactionInfo info) throws MetaException {
+ try {
+ try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED)) {
+ try (PreparedStatement stmt = dbConn.prepareStatement("UPDATE \"COMPACTION_QUEUE\" " +
+ "SET \"CQ_RETRY_RETENTION\" = ?, \"CQ_ERROR_MESSAGE\"= ? WHERE \"CQ_ID\" = ?")) {
+ stmt.setLong(1, info.retryRetention);
+ stmt.setString(2, info.errorMessage);
+ stmt.setLong(3, info.id);
+ int updCnt = stmt.executeUpdate();
+ if (updCnt != 1) {
+ LOG.error("Unable to update compaction queue record: " + info + ". updCnt=" + updCnt);
+ dbConn.rollback();
+ throw new MetaException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE");
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.error("Unable to update compaction queue: " + e.getMessage());
+ rollbackDBConn(dbConn);
+ checkRetryable(e, "setCleanerRetryRetentionTimeOnError(" + info + ")");
+ throw new MetaException("Unable to update compaction queue: " +
+ StringUtils.stringifyException(e));
+ }
+ } catch (SQLException e) {
+ LOG.error("Unable to connect to transaction database: " + e.getMessage());
+ checkRetryable(e, "setCleanerRetryRetentionTimeOnError(" + info + ")");
+ throw new MetaException("Unable to connect to transaction database: " +
+ StringUtils.stringifyException(e));
+ }
+ } catch (RetryException e) {
+ setCleanerRetryRetentionTimeOnError(info);
+ }
+ }
+
@Override
@RetrySemantics.Idempotent
public void setHadoopJobId(String hadoopJobId, long id) {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index c55f534..e038b90 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -523,6 +523,15 @@ public interface TxnStore extends Configurable {
void markRefused(CompactionInfo info) throws MetaException;
/**
+ * Stores the value of {@link CompactionInfo#retryRetention} and {@link CompactionInfo#errorMessage} fields
+ * of the CompactionInfo in the HMS database.
+ * @param info The {@link CompactionInfo} object holding the values.
+ * @throws MetaException
+ */
+ @RetrySemantics.CannotRetry
+ void setCleanerRetryRetentionTimeOnError(CompactionInfo info) throws MetaException;
+
+ /**
* Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
* min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)).
*/
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/StringableMap.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/StringableMap.java
index b3f1749..d00c656 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/StringableMap.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/StringableMap.java
@@ -30,6 +30,10 @@ import java.util.Properties;
public class StringableMap extends HashMap<String, String> {
public StringableMap(String s) {
+ super();
+ if (s == null || s.isEmpty()) {
+ return;
+ }
String[] parts = s.split(":", 2);
// read that many chars
int numElements = Integer.parseInt(parts[0]);
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 1fda476..8c80524 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -629,7 +629,8 @@ CREATE TABLE COMPACTION_QUEUE (
CQ_INITIATOR_ID varchar(128),
CQ_INITIATOR_VERSION varchar(128),
CQ_WORKER_VERSION varchar(128),
- CQ_CLEANER_START bigint
+ CQ_CLEANER_START bigint,
+ CQ_RETRY_RETENTION bigint NOT NULL DEFAULT 0
);
CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index 25c7573..4891c6e 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -195,5 +195,8 @@ CREATE TABLE COMPACTION_METRICS_CACHE (
CMC_VERSION integer NOT NULL
);
+-- HIVE-25993
+ALTER TABLE "APP"."COMPACTION_QUEUE" ADD COLUMN "CQ_RETRY_RETENTION" bigint NOT NULL DEFAULT 0;
+
-- This needs to be the last thing done. Insert any changes above this line.
UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 3301eba..074c5bb 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1049,6 +1049,7 @@ CREATE TABLE COMPACTION_QUEUE(
CQ_INITIATOR_VERSION nvarchar(128) NULL,
CQ_WORKER_VERSION nvarchar(128) NULL,
CQ_CLEANER_START bigint NULL,
+ CQ_RETRY_RETENTION bigint NOT NULL DEFAULT 0,
PRIMARY KEY CLUSTERED
(
CQ_ID ASC
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index 21d6120..a3547d2 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -246,6 +246,9 @@ CREATE TABLE COMPACTION_METRICS_CACHE (
CMC_VERSION int NOT NULL
);
+-- HIVE-25993
+ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_RETRY_RETENTION" bigint NOT NULL DEFAULT 0;
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index e4b86b5..0c9dbce 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -1090,7 +1090,8 @@ CREATE TABLE COMPACTION_QUEUE (
CQ_INITIATOR_ID varchar(128),
CQ_INITIATOR_VERSION varchar(128),
CQ_WORKER_VERSION varchar(128),
- CQ_CLEANER_START bigint
+ CQ_CLEANER_START bigint,
+ CQ_RETRY_RETENTION bigint NOT NULL DEFAULT 0
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
CREATE TABLE COMPLETED_COMPACTIONS (
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index 090090e..6233b79 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -223,6 +223,9 @@ CREATE TABLE COMPACTION_METRICS_CACHE (
CMC_VERSION int NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+-- HIVE-25993
+ALTER TABLE `COMPACTION_QUEUE` ADD COLUMN `CQ_RETRY_RETENTION` bigint NOT NULL DEFAULT 0;
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index 726fd34..e9078ac 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -1092,7 +1092,8 @@ CREATE TABLE COMPACTION_QUEUE (
CQ_INITIATOR_ID varchar(128),
CQ_INITIATOR_VERSION varchar(128),
CQ_WORKER_VERSION varchar(128),
- CQ_CLEANER_START NUMBER(19)
+ CQ_CLEANER_START NUMBER(19),
+ CQ_RETRY_RETENTION NUMBER(19) DEFAULT 0 NOT NULL
) ROWDEPENDENCIES;
CREATE TABLE NEXT_COMPACTION_QUEUE_ID (
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index 220d898..91cee24 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -220,6 +220,9 @@ CREATE TABLE COMPACTION_METRICS_CACHE (
CMC_VERSION number(10) NOT NULL
) ROWDEPENDENCIES;
+-- HIVE-25993
+ALTER TABLE COMPACTION_QUEUE ADD "CQ_RETRY_RETENTION" NUMBER(19) DEFAULT 0 NOT NULL;
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status from dual;
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index 2570321..c86747f 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1801,7 +1801,8 @@ CREATE TABLE "COMPACTION_QUEUE" (
"CQ_INITIATOR_ID" varchar(128),
"CQ_INITIATOR_VERSION" varchar(128),
"CQ_WORKER_VERSION" varchar(128),
- "CQ_CLEANER_START" bigint
+ "CQ_CLEANER_START" bigint,
+ "CQ_RETRY_RETENTION" bigint not null default 0
);
CREATE TABLE "NEXT_COMPACTION_QUEUE_ID" (
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index 1c804cd..3213a39 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -357,6 +357,9 @@ CREATE TABLE "COMPACTION_METRICS_CACHE" (
"CMC_VERSION" integer NOT NULL
);
+-- HIVE-25993
+ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_RETRY_RETENTION" bigint NOT NULL DEFAULT 0;
+
-- These lines need to be last. Insert any changes above.
UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0';
diff --git a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
index 6457fa2..af68ca9 100644
--- a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql
@@ -128,6 +128,9 @@ CREATE TABLE "COMPACTION_METRICS_CACHE" (
"CMC_VERSION" integer NOT NULL
);
+-- HIVE-25993
+ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_RETRY_RETENTION" integer NOT NULL DEFAULT 0;
+
-- These lines need to be last. Insert any changes above.
UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1;
SELECT 'Finished upgrading MetaStore schema from 3.1.3000 to 4.0.0';