You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2021/10/28 08:58:30 UTC
[hive] branch master updated: HIVE-25650 Make workerId and
workerVersionId optional in the FindNextCompactRequest (#2749) (Viktor
Csomor, reviewed by Laszlo Pinter)
This is an automated email from the ASF dual-hosted git repository.
lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 28f0512 HIVE-25650 Make workerId and workerVersionId optional in the FindNextCompactRequest (#2749) (Viktor Csomor, reviewed by Laszlo Pinter)
28f0512 is described below
commit 28f05124c50f9f89452f4ffa7910786a8ab1c706
Author: Viktor Csomor <cs...@gmail.com>
AuthorDate: Thu Oct 28 10:58:11 2021 +0200
HIVE-25650 Make workerId and workerVersionId optional in the FindNextCompactRequest (#2749) (Viktor Csomor, reviewed by Laszlo Pinter)
---
.../hadoop/hive/ql/txn/compactor/Worker.java | 9 +-
.../metastore/txn/TestCompactionTxnHandler.java | 67 +++++++------
.../hive/ql/txn/compactor/CompactorTest.java | 5 +-
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 6 +-
.../hive/ql/txn/compactor/TestInitiator.java | 13 ++-
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 37 ++++----
.../src/gen/thrift/gen-cpp/hive_metastore_types.h | 15 ++-
.../hive/metastore/api/FindNextCompactRequest.java | 105 +++++++++++----------
.../gen-php/metastore/FindNextCompactRequest.php | 4 +-
.../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 4 -
.../src/gen/thrift/gen-rb/hive_metastore_types.rb | 6 +-
.../src/main/thrift/hive_metastore.thrift | 4 +-
.../hive/metastore/txn/CompactionTxnHandler.java | 41 +++++---
.../hive/metastore/TestHiveMetaStoreTxns.java | 4 +-
14 files changed, 188 insertions(+), 132 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 1b8a13f..defa9b4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -371,8 +371,11 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
return false;
}
}
- ci = CompactionInfo.optionalCompactionInfoStructToInfo(
- msc.findNextCompact(new FindNextCompactRequest(workerName, runtimeVersion)));
+
+ FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest();
+ findNextCompactRequest.setWorkerId(workerName);
+ findNextCompactRequest.setWorkerVersion(runtimeVersion);
+ ci = CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(findNextCompactRequest));
LOG.debug("Processing compaction request " + ci);
if (ci == null) {
@@ -803,4 +806,4 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
}
}
}
-}
\ No newline at end of file
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 66ee3d6..ea1abc6 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -92,14 +92,14 @@ public class TestCompactionTxnHandler {
rqst.setPartitionname("ds=today");
txnHandler.compact(rqst);
long now = System.currentTimeMillis();
- CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
assertEquals("foo", ci.dbname);
assertEquals("bar", ci.tableName);
assertEquals("ds=today", ci.partName);
assertEquals(CompactionType.MINOR, ci.type);
assertNull(ci.runAs);
- assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)));
+ assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
ci.runAs = "bob";
txnHandler.updateCompactorState(ci, openTxn());
@@ -129,7 +129,7 @@ public class TestCompactionTxnHandler {
long now = System.currentTimeMillis();
boolean expectToday = false;
- CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
assertEquals("foo", ci.dbname);
assertEquals("bar", ci.tableName);
@@ -138,7 +138,7 @@ public class TestCompactionTxnHandler {
else fail("partition name should have been today or yesterday but was " + ci.partName);
assertEquals(CompactionType.MINOR, ci.type);
- ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
assertEquals("foo", ci.dbname);
assertEquals("bar", ci.tableName);
@@ -146,7 +146,7 @@ public class TestCompactionTxnHandler {
else assertEquals("ds=yesterday", ci.partName);
assertEquals(CompactionType.MINOR, ci.type);
- assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)));
+ assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -160,7 +160,7 @@ public class TestCompactionTxnHandler {
@Test
public void testFindNextToCompactNothingToCompact() throws Exception {
- assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)));
+ assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
}
@Test
@@ -168,11 +168,11 @@ public class TestCompactionTxnHandler {
CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
rqst.setPartitionname("ds=today");
txnHandler.compact(rqst);
- CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
txnHandler.markCompacted(ci);
- assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)));
+ assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
@@ -194,16 +194,16 @@ public class TestCompactionTxnHandler {
rqst.setPartitionname("ds=today");
txnHandler.compact(rqst);
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
- CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
txnHandler.markCompacted(ci);
- assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)));
+ assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
List<CompactionInfo> toClean = txnHandler.findReadyToClean(0, 0);
assertEquals(1, toClean.size());
- assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)));
+ assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
@@ -223,18 +223,18 @@ public class TestCompactionTxnHandler {
rqst.setPartitionname("ds=today");
txnHandler.compact(rqst);
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
- CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
txnHandler.markCompacted(ci);
- assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)));
+ assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
List<CompactionInfo> toClean = txnHandler.findReadyToClean(0, 0);
assertEquals(1, toClean.size());
- assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)));
+ assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
txnHandler.markCleaned(ci);
- assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)));
+ assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -392,13 +392,13 @@ public class TestCompactionTxnHandler {
rqst.setPartitionname(partitionName);
txnHandler.compact(rqst);
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
- CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest(workerId, WORKER_VERSION));
+ CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest(workerId, WORKER_VERSION));
assertNotNull(ci);
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
ci.errorMessage = errorMessage;
txnHandler.markFailed(ci);
- assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest(workerId, WORKER_VERSION)));
+ assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest(workerId, WORKER_VERSION)));
boolean failedCheck = txnHandler.checkFailedCompactions(ci);
assertFalse(failedCheck);
try {
@@ -460,7 +460,7 @@ public class TestCompactionTxnHandler {
rqst.setPartitionname(partitionName);
}
txnHandler.compact(rqst);
- ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
ci.errorMessage = errorMessage;
txnHandler.markFailed(ci);
@@ -473,7 +473,7 @@ public class TestCompactionTxnHandler {
rqst.setPartitionname(partitionName);
}
txnHandler.compact(rqst);
- ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
txnHandler.markCleaned(ci);
}
@@ -486,7 +486,7 @@ public class TestCompactionTxnHandler {
rqst.setPartitionname(partitionName);
}
txnHandler.compact(rqst);
- ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
ci.errorMessage = errorMessage;
txnHandler.markCompacted(ci);
@@ -612,9 +612,9 @@ public class TestCompactionTxnHandler {
txnHandler.compact(rqst);
rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR);
txnHandler.compact(rqst);
- assertNotNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred-193892", WORKER_VERSION)));
- assertNotNull(txnHandler.findNextToCompact(new FindNextCompactRequest("bob-193892", WORKER_VERSION)));
- assertNotNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred-193893", WORKER_VERSION)));
+ assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193892", WORKER_VERSION)));
+ assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("bob-193892", WORKER_VERSION)));
+ assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193893", WORKER_VERSION)));
txnHandler.revokeFromLocalWorkers("fred");
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -643,9 +643,9 @@ public class TestCompactionTxnHandler {
rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR);
txnHandler.compact(rqst);
- assertNotNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred-193892", WORKER_VERSION)));
+ assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193892", WORKER_VERSION)));
Thread.sleep(200);
- assertNotNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred-193892", WORKER_VERSION)));
+ assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193892", WORKER_VERSION)));
txnHandler.revokeTimedoutWorkers(100);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -708,7 +708,7 @@ public class TestCompactionTxnHandler {
//simulate prev failed compaction
CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MINOR);
txnHandler.compact(rqst);
- CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
txnHandler.markFailed(ci);
potentials = txnHandler.findPotentialCompactions(100, -1, 1);
@@ -775,7 +775,7 @@ public class TestCompactionTxnHandler {
CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR);
txnHandler.compact(rqst);
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
- ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
txnHandler.markCompacted(ci);
@@ -799,7 +799,7 @@ public class TestCompactionTxnHandler {
rqst.setPartitionname("bar");
txnHandler.compact(rqst);
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
- ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
txnHandler.markCompacted(ci);
@@ -881,7 +881,7 @@ public class TestCompactionTxnHandler {
assertTrue(enqueueTime <= after);
assertTrue(enqueueTime >= before);
- CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
txnHandler.markFailed(ci);
checkEnqueueTime(enqueueTime);
@@ -904,7 +904,7 @@ public class TestCompactionTxnHandler {
assertTrue(enqueueTime <= after);
assertTrue(enqueueTime >= before);
- CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
ci.runAs = "bob";
txnHandler.updateCompactorState(ci, openTxn());
checkEnqueueTime(enqueueTime);
@@ -916,6 +916,13 @@ public class TestCompactionTxnHandler {
checkEnqueueTime(enqueueTime);
}
+ private static FindNextCompactRequest aFindNextCompactRequest(String workerId, String workerVersion) {
+ FindNextCompactRequest request = new FindNextCompactRequest();
+ request.setWorkerId(workerId);
+ request.setWorkerVersion(workerVersion);
+ return request;
+ }
+
private void checkEnqueueTime(long enqueueTime) throws MetaException {
ShowCompactResponse showCompactResponse = txnHandler.showCompact(new ShowCompactRequest());
ShowCompactResponseElement element = showCompactResponse.getCompacts().get(0);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index b3788e4..5dc01f9 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -638,7 +638,10 @@ public abstract class CompactorTest {
protected long compactInTxn(CompactionRequest rqst) throws Exception {
txnHandler.compact(rqst);
- CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION));
+ FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest();
+ findNextCompactRequest.setWorkerId("fred");
+ findNextCompactRequest.setWorkerVersion(WORKER_VERSION);
+ CompactionInfo ci = txnHandler.findNextToCompact(findNextCompactRequest);
ci.runAs = System.getProperty("user.name");
long compactorTxnId = openTxn(TxnType.COMPACTION);
// Need to create a valid writeIdList to set the highestWriteId in ci
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 665d47c..a1205f4 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -106,7 +106,11 @@ public class TestCleaner extends CompactorTest {
CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR);
txnHandler.compact(rqst);
- CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", "4.0.0"));
+
+ FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest();
+ findNextCompactRequest.setWorkerId("fred");
+ findNextCompactRequest.setWorkerVersion("4.0.0");
+ CompactionInfo ci = txnHandler.findNextToCompact(findNextCompactRequest);
ci.runAs = System.getProperty("user.name");
long compactTxn = openTxn(TxnType.COMPACTION);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index ba5b485..ca8e1e0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -79,8 +79,8 @@ public class TestInitiator extends CompactorTest {
rqst = new CompactionRequest("default", "rflw2", CompactionType.MINOR);
txnHandler.compact(rqst);
- txnHandler.findNextToCompact(new FindNextCompactRequest(ServerUtils.hostname() + "-193892", "4.0.0"));
- txnHandler.findNextToCompact(new FindNextCompactRequest("nosuchhost-193892", "4.0.0"));
+ txnHandler.findNextToCompact(aFindNextCompactRequest(ServerUtils.hostname() + "-193892", "4.0.0"));
+ txnHandler.findNextToCompact(aFindNextCompactRequest("nosuchhost-193892", "4.0.0"));
startInitiator();
@@ -106,7 +106,7 @@ public class TestInitiator extends CompactorTest {
CompactionRequest rqst = new CompactionRequest("default", "rfrw1", CompactionType.MINOR);
txnHandler.compact(rqst);
- txnHandler.findNextToCompact(new FindNextCompactRequest("nosuchhost-193892", "4.0.0"));
+ txnHandler.findNextToCompact(aFindNextCompactRequest("nosuchhost-193892", "4.0.0"));
conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L, TimeUnit.MILLISECONDS);
@@ -1068,6 +1068,13 @@ public class TestInitiator extends CompactorTest {
Assert.assertEquals(ServerUtils.hostname(), String.join("-", Arrays.copyOfRange(parts, 0, parts.length - 1)));
}
+ private static FindNextCompactRequest aFindNextCompactRequest(String workerId, String workerVersion) {
+ FindNextCompactRequest request = new FindNextCompactRequest();
+ request.setWorkerId(workerId);
+ request.setWorkerVersion(workerVersion);
+ return request;
+ }
+
@Override
boolean useHive130DeltaDirName() {
return false;
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index b19b42a..2ff5912 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -28000,10 +28000,12 @@ FindNextCompactRequest::~FindNextCompactRequest() noexcept {
void FindNextCompactRequest::__set_workerId(const std::string& val) {
this->workerId = val;
+__isset.workerId = true;
}
void FindNextCompactRequest::__set_workerVersion(const std::string& val) {
this->workerVersion = val;
+__isset.workerVersion = true;
}
std::ostream& operator<<(std::ostream& out, const FindNextCompactRequest& obj)
{
@@ -28024,8 +28026,6 @@ uint32_t FindNextCompactRequest::read(::apache::thrift::protocol::TProtocol* ipr
using ::apache::thrift::protocol::TProtocolException;
- bool isset_workerId = false;
- bool isset_workerVersion = false;
while (true)
{
@@ -28038,7 +28038,7 @@ uint32_t FindNextCompactRequest::read(::apache::thrift::protocol::TProtocol* ipr
case 1:
if (ftype == ::apache::thrift::protocol::T_STRING) {
xfer += iprot->readString(this->workerId);
- isset_workerId = true;
+ this->__isset.workerId = true;
} else {
xfer += iprot->skip(ftype);
}
@@ -28046,7 +28046,7 @@ uint32_t FindNextCompactRequest::read(::apache::thrift::protocol::TProtocol* ipr
case 2:
if (ftype == ::apache::thrift::protocol::T_STRING) {
xfer += iprot->readString(this->workerVersion);
- isset_workerVersion = true;
+ this->__isset.workerVersion = true;
} else {
xfer += iprot->skip(ftype);
}
@@ -28060,10 +28060,6 @@ uint32_t FindNextCompactRequest::read(::apache::thrift::protocol::TProtocol* ipr
xfer += iprot->readStructEnd();
- if (!isset_workerId)
- throw TProtocolException(TProtocolException::INVALID_DATA);
- if (!isset_workerVersion)
- throw TProtocolException(TProtocolException::INVALID_DATA);
return xfer;
}
@@ -28072,14 +28068,16 @@ uint32_t FindNextCompactRequest::write(::apache::thrift::protocol::TProtocol* op
::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot);
xfer += oprot->writeStructBegin("FindNextCompactRequest");
- xfer += oprot->writeFieldBegin("workerId", ::apache::thrift::protocol::T_STRING, 1);
- xfer += oprot->writeString(this->workerId);
- xfer += oprot->writeFieldEnd();
-
- xfer += oprot->writeFieldBegin("workerVersion", ::apache::thrift::protocol::T_STRING, 2);
- xfer += oprot->writeString(this->workerVersion);
- xfer += oprot->writeFieldEnd();
-
+ if (this->__isset.workerId) {
+ xfer += oprot->writeFieldBegin("workerId", ::apache::thrift::protocol::T_STRING, 1);
+ xfer += oprot->writeString(this->workerId);
+ xfer += oprot->writeFieldEnd();
+ }
+ if (this->__isset.workerVersion) {
+ xfer += oprot->writeFieldBegin("workerVersion", ::apache::thrift::protocol::T_STRING, 2);
+ xfer += oprot->writeString(this->workerVersion);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -28089,22 +28087,25 @@ void swap(FindNextCompactRequest &a, FindNextCompactRequest &b) {
using ::std::swap;
swap(a.workerId, b.workerId);
swap(a.workerVersion, b.workerVersion);
+ swap(a.__isset, b.__isset);
}
FindNextCompactRequest::FindNextCompactRequest(const FindNextCompactRequest& other1006) {
workerId = other1006.workerId;
workerVersion = other1006.workerVersion;
+ __isset = other1006.__isset;
}
FindNextCompactRequest& FindNextCompactRequest::operator=(const FindNextCompactRequest& other1007) {
workerId = other1007.workerId;
workerVersion = other1007.workerVersion;
+ __isset = other1007.__isset;
return *this;
}
void FindNextCompactRequest::printTo(std::ostream& out) const {
using ::apache::thrift::to_string;
out << "FindNextCompactRequest(";
- out << "workerId=" << to_string(workerId);
- out << ", " << "workerVersion=" << to_string(workerVersion);
+ out << "workerId="; (__isset.workerId ? (out << to_string(workerId)) : (out << "<null>"));
+ out << ", " << "workerVersion="; (__isset.workerVersion ? (out << to_string(workerVersion)) : (out << "<null>"));
out << ")";
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index 6787844..025e733 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -10598,6 +10598,11 @@ void swap(GetLatestCommittedCompactionInfoResponse &a, GetLatestCommittedCompact
std::ostream& operator<<(std::ostream& out, const GetLatestCommittedCompactionInfoResponse& obj);
+typedef struct _FindNextCompactRequest__isset {
+ _FindNextCompactRequest__isset() : workerId(false), workerVersion(false) {}
+ bool workerId :1;
+ bool workerVersion :1;
+} _FindNextCompactRequest__isset;
class FindNextCompactRequest : public virtual ::apache::thrift::TBase {
public:
@@ -10611,15 +10616,21 @@ class FindNextCompactRequest : public virtual ::apache::thrift::TBase {
std::string workerId;
std::string workerVersion;
+ _FindNextCompactRequest__isset __isset;
+
void __set_workerId(const std::string& val);
void __set_workerVersion(const std::string& val);
bool operator == (const FindNextCompactRequest & rhs) const
{
- if (!(workerId == rhs.workerId))
+ if (__isset.workerId != rhs.__isset.workerId)
return false;
- if (!(workerVersion == rhs.workerVersion))
+ else if (__isset.workerId && !(workerId == rhs.workerId))
+ return false;
+ if (__isset.workerVersion != rhs.__isset.workerVersion)
+ return false;
+ else if (__isset.workerVersion && !(workerVersion == rhs.workerVersion))
return false;
return true;
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FindNextCompactRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FindNextCompactRequest.java
index 3b20e8a..568887a 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FindNextCompactRequest.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FindNextCompactRequest.java
@@ -17,8 +17,8 @@ package org.apache.hadoop.hive.metastore.api;
private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new FindNextCompactRequestStandardSchemeFactory();
private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new FindNextCompactRequestTupleSchemeFactory();
- private @org.apache.thrift.annotation.Nullable java.lang.String workerId; // required
- private @org.apache.thrift.annotation.Nullable java.lang.String workerVersion; // required
+ private @org.apache.thrift.annotation.Nullable java.lang.String workerId; // optional
+ private @org.apache.thrift.annotation.Nullable java.lang.String workerVersion; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -84,12 +84,13 @@ package org.apache.hadoop.hive.metastore.api;
}
// isset id assignments
+ private static final _Fields optionals[] = {_Fields.WORKER_ID,_Fields.WORKER_VERSION};
public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
- tmpMap.put(_Fields.WORKER_ID, new org.apache.thrift.meta_data.FieldMetaData("workerId", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ tmpMap.put(_Fields.WORKER_ID, new org.apache.thrift.meta_data.FieldMetaData("workerId", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
- tmpMap.put(_Fields.WORKER_VERSION, new org.apache.thrift.meta_data.FieldMetaData("workerVersion", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ tmpMap.put(_Fields.WORKER_VERSION, new org.apache.thrift.meta_data.FieldMetaData("workerVersion", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(FindNextCompactRequest.class, metaDataMap);
@@ -98,15 +99,6 @@ package org.apache.hadoop.hive.metastore.api;
public FindNextCompactRequest() {
}
- public FindNextCompactRequest(
- java.lang.String workerId,
- java.lang.String workerVersion)
- {
- this();
- this.workerId = workerId;
- this.workerVersion = workerVersion;
- }
-
/**
* Performs a deep copy on <i>other</i>.
*/
@@ -324,35 +316,31 @@ package org.apache.hadoop.hive.metastore.api;
java.lang.StringBuilder sb = new java.lang.StringBuilder("FindNextCompactRequest(");
boolean first = true;
- sb.append("workerId:");
- if (this.workerId == null) {
- sb.append("null");
- } else {
- sb.append(this.workerId);
- }
- first = false;
- if (!first) sb.append(", ");
- sb.append("workerVersion:");
- if (this.workerVersion == null) {
- sb.append("null");
- } else {
- sb.append(this.workerVersion);
- }
- first = false;
+ if (isSetWorkerId()) {
+ sb.append("workerId:");
+ if (this.workerId == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.workerId);
+ }
+ first = false;
+ }
+ if (isSetWorkerVersion()) {
+ if (!first) sb.append(", ");
+ sb.append("workerVersion:");
+ if (this.workerVersion == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.workerVersion);
+ }
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
public void validate() throws org.apache.thrift.TException {
// check for required fields
- if (!isSetWorkerId()) {
- throw new org.apache.thrift.protocol.TProtocolException("Required field 'workerId' is unset! Struct:" + toString());
- }
-
- if (!isSetWorkerVersion()) {
- throw new org.apache.thrift.protocol.TProtocolException("Required field 'workerVersion' is unset! Struct:" + toString());
- }
-
// check for sub-struct validity
}
@@ -420,14 +408,18 @@ package org.apache.hadoop.hive.metastore.api;
oprot.writeStructBegin(STRUCT_DESC);
if (struct.workerId != null) {
- oprot.writeFieldBegin(WORKER_ID_FIELD_DESC);
- oprot.writeString(struct.workerId);
- oprot.writeFieldEnd();
+ if (struct.isSetWorkerId()) {
+ oprot.writeFieldBegin(WORKER_ID_FIELD_DESC);
+ oprot.writeString(struct.workerId);
+ oprot.writeFieldEnd();
+ }
}
if (struct.workerVersion != null) {
- oprot.writeFieldBegin(WORKER_VERSION_FIELD_DESC);
- oprot.writeString(struct.workerVersion);
- oprot.writeFieldEnd();
+ if (struct.isSetWorkerVersion()) {
+ oprot.writeFieldBegin(WORKER_VERSION_FIELD_DESC);
+ oprot.writeString(struct.workerVersion);
+ oprot.writeFieldEnd();
+ }
}
oprot.writeFieldStop();
oprot.writeStructEnd();
@@ -446,17 +438,34 @@ package org.apache.hadoop.hive.metastore.api;
@Override
public void write(org.apache.thrift.protocol.TProtocol prot, FindNextCompactRequest struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
- oprot.writeString(struct.workerId);
- oprot.writeString(struct.workerVersion);
+ java.util.BitSet optionals = new java.util.BitSet();
+ if (struct.isSetWorkerId()) {
+ optionals.set(0);
+ }
+ if (struct.isSetWorkerVersion()) {
+ optionals.set(1);
+ }
+ oprot.writeBitSet(optionals, 2);
+ if (struct.isSetWorkerId()) {
+ oprot.writeString(struct.workerId);
+ }
+ if (struct.isSetWorkerVersion()) {
+ oprot.writeString(struct.workerVersion);
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot, FindNextCompactRequest struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot;
- struct.workerId = iprot.readString();
- struct.setWorkerIdIsSet(true);
- struct.workerVersion = iprot.readString();
- struct.setWorkerVersionIsSet(true);
+ java.util.BitSet incoming = iprot.readBitSet(2);
+ if (incoming.get(0)) {
+ struct.workerId = iprot.readString();
+ struct.setWorkerIdIsSet(true);
+ }
+ if (incoming.get(1)) {
+ struct.workerVersion = iprot.readString();
+ struct.setWorkerVersionIsSet(true);
+ }
}
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/FindNextCompactRequest.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/FindNextCompactRequest.php
index 53e2cee..9950693 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/FindNextCompactRequest.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/FindNextCompactRequest.php
@@ -23,12 +23,12 @@ class FindNextCompactRequest
static public $_TSPEC = array(
1 => array(
'var' => 'workerId',
- 'isRequired' => true,
+ 'isRequired' => false,
'type' => TType::STRING,
),
2 => array(
'var' => 'workerVersion',
- 'isRequired' => true,
+ 'isRequired' => false,
'type' => TType::STRING,
),
);
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 02a1ba9..756e31e 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -16065,10 +16065,6 @@ class FindNextCompactRequest(object):
oprot.writeStructEnd()
def validate(self):
- if self.workerId is None:
- raise TProtocolException(message='Required field workerId is unset!')
- if self.workerVersion is None:
- raise TProtocolException(message='Required field workerVersion is unset!')
return
def __repr__(self):
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 3d92e9c..57749a9 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -4621,15 +4621,13 @@ class FindNextCompactRequest
WORKERVERSION = 2
FIELDS = {
- WORKERID => {:type => ::Thrift::Types::STRING, :name => 'workerId'},
- WORKERVERSION => {:type => ::Thrift::Types::STRING, :name => 'workerVersion'}
+ WORKERID => {:type => ::Thrift::Types::STRING, :name => 'workerId', :optional => true},
+ WORKERVERSION => {:type => ::Thrift::Types::STRING, :name => 'workerVersion', :optional => true}
}
def struct_fields; FIELDS; end
def validate
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field workerId is unset!') unless @workerId
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field workerVersion is unset!') unless @workerVersion
end
::Thrift::Struct.generate_accessors self
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index 941fe58..e1d7006 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1331,8 +1331,8 @@ struct GetLatestCommittedCompactionInfoResponse {
}
struct FindNextCompactRequest {
- 1: required string workerId,
- 2: required string workerVersion
+ 1: optional string workerId,
+ 2: optional string workerVersion
}
struct AddDynamicPartitions {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index d37e6ad..79c06be 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -175,7 +175,9 @@ class CompactionTxnHandler extends TxnHandler {
@Override
@RetrySemantics.SafeToRetry
public CompactionInfo findNextToCompact(String workerId) throws MetaException {
- return findNextToCompact(new FindNextCompactRequest(workerId, null));
+ FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest();
+ findNextCompactRequest.setWorkerId(workerId);
+ return findNextToCompact(findNextCompactRequest);
}
/**
@@ -190,6 +192,7 @@ class CompactionTxnHandler extends TxnHandler {
if (rqst == null) {
throw new MetaException("FindNextCompactRequest is null");
}
+
Connection dbConn = null;
Statement stmt = null;
//need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725)
@@ -198,10 +201,10 @@ class CompactionTxnHandler extends TxnHandler {
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " +
+ String query = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " +
"\"CQ_TYPE\", \"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + INITIATED_STATE + "'";
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
+ LOG.debug("Going to execute query <" + query + ">");
+ rs = stmt.executeQuery(query);
if (!rs.next()) {
LOG.debug("No compactions found ready to compact");
dbConn.rollback();
@@ -216,25 +219,37 @@ class CompactionTxnHandler extends TxnHandler {
info.partName = rs.getString(4);
info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0));
info.properties = rs.getString(6);
+
+ String workerId = rqst.getWorkerId();
+ String workerVersion = rqst.getWorkerVersion();
+ String workerIdSqlValue = (workerId == null) ? "NULL" : ("'" + workerId + "'");
+
// Now, update this record as being worked on by this worker.
long now = getDbTime(dbConn);
- s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = '" + rqst.getWorkerId() + "', " +
- "\"CQ_WORKER_VERSION\" = '" + rqst.getWorkerVersion() + "', " +
- "\"CQ_START\" = " + now + ", \"CQ_STATE\" = '" + WORKING_STATE + "' WHERE \"CQ_ID\" = " + info.id +
- " AND \"CQ_STATE\"='" + INITIATED_STATE + "'";
- LOG.debug("Going to execute update <" + s + ">");
- int updCount = updStmt.executeUpdate(s);
+ query = "" +
+ "UPDATE " +
+ " \"COMPACTION_QUEUE\" " +
+ "SET " +
+ " \"CQ_WORKER_ID\" = " + workerIdSqlValue + ", " +
+ " \"CQ_WORKER_VERSION\" = '" + workerVersion + "', " +
+ " \"CQ_START\" = " + now + ", " +
+ " \"CQ_STATE\" = '" + WORKING_STATE + "' " +
+ "WHERE \"CQ_ID\" = " + info.id +
+ " AND \"CQ_STATE\"='" + INITIATED_STATE + "'";
+
+ LOG.debug("Going to execute update <" + query + ">");
+ int updCount = updStmt.executeUpdate(query);
if(updCount == 1) {
dbConn.commit();
return info;
}
if(updCount == 0) {
- LOG.debug("Worker {} (version: {}) picked up {}", rqst.getWorkerId(), rqst.getWorkerVersion(), info);
+ LOG.debug("Worker {} (version: {}) picked up {}", workerId, workerVersion, info);
continue;
}
LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " +
- info + ". updCnt=" + updCount + ". workerId=" + rqst.getWorkerId() +
- ". workerVersion=" + rqst.getWorkerVersion());
+ info + ". updCnt=" + updCount + ". workerId=" + workerId +
+ ". workerVersion=" + workerVersion);
dbConn.rollback();
return null;
} while( rs.next());
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
index 10f5f40..ff7e2d1 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java
@@ -419,7 +419,9 @@ public class TestHiveMetaStoreTxns {
tbl = client.getTable(dbName, tblName);
client.compact2(tbl.getDbName(), tbl.getTableName(), null, CompactionType.MINOR, new HashMap<>());
- OptionalCompactionInfoStruct optionalCi = client.findNextCompact(new FindNextCompactRequest("myworker", null));
+ FindNextCompactRequest compactRequest = new FindNextCompactRequest();
+ compactRequest.setWorkerId("myworker");
+ OptionalCompactionInfoStruct optionalCi = client.findNextCompact(compactRequest);
client.markCleaned(optionalCi.getCi());
GetLatestCommittedCompactionInfoRequest rqst = new GetLatestCommittedCompactionInfoRequest();