You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/04/30 09:37:33 UTC
[hive] branch master updated: HIVE-23293: Locks: Implement
zero-wait readers (Denys Kuzmenko reviewed by Marton Bod, Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 36cc7bc HIVE-23293: Locks: Implement zero-wait readers (Denys Kuzmenko reviewed by Marton Bod, Peter Vary)
36cc7bc is described below
commit 36cc7bc2c80b958344cd5cdd5003b23ff35c6033
Author: Denys Kuzmenko <dk...@apache.org>
AuthorDate: Thu Apr 30 11:36:02 2020 +0200
HIVE-23293: Locks: Implement zero-wait readers (Denys Kuzmenko reviewed by Marton Bod, Peter Vary)
---
.../java/org/apache/hadoop/hive/ql/ErrorMsg.java | 4 +-
.../hadoop/hive/ql/lockmgr/DbLockManager.java | 6 +-
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 5 +-
.../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 96 +++++++++++++-----
.../test/results/clientnegative/insert_into1.q.out | 2 +-
.../test/results/clientnegative/insert_into2.q.out | 2 +-
.../test/results/clientnegative/insert_into3.q.out | 2 +-
.../test/results/clientnegative/insert_into4.q.out | 2 +-
.../lockneg_try_drop_locked_db.q.out | 2 +-
.../hadoop/hive/metastore/api/LockRequest.java | 109 +++++++++++++++++++-
.../hadoop/hive/metastore/api/LockResponse.java | 112 ++++++++++++++++++++-
.../src/gen/thrift/gen-php/metastore/Types.php | 46 +++++++++
.../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 30 +++++-
.../src/gen/thrift/gen-rb/hive_metastore_types.rb | 8 +-
.../hadoop/hive/metastore/LockRequestBuilder.java | 5 +
.../src/main/thrift/hive_metastore.thrift | 2 +
.../hadoop/hive/metastore/txn/TxnHandler.java | 58 +++++++----
17 files changed, 426 insertions(+), 65 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 8e643fe..d732004 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -176,8 +176,8 @@ public enum ErrorMsg {
LOAD_INTO_NON_NATIVE(10101, "A non-native table cannot be used as target for LOAD"),
LOCKMGR_NOT_SPECIFIED(10102, "Lock manager not specified correctly, set hive.lock.manager"),
LOCKMGR_NOT_INITIALIZED(10103, "Lock manager could not be initialized, check hive.lock.manager "),
- LOCK_CANNOT_BE_ACQUIRED(10104, "Locks on the underlying objects cannot be acquired. "
- + "retry after some time"),
+ LOCK_CANNOT_BE_ACQUIRED(10104, "Locks on the underlying objects cannot be acquired, "
+ + "retry after some time."),
ZOOKEEPER_CLIENT_COULD_NOT_BE_INITIALIZED(10105, "Check hive.zookeeper.quorum "
+ "and hive.zookeeper.client.port"),
OVERWRITE_ARCHIVED_PART(10106, "Cannot overwrite an archived partition. " +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
index fb5a306..84396e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
@@ -140,9 +140,9 @@ public final class DbLockManager implements HiveLockManager{
}
locks.add(hl);
if (res.getState() != LockState.ACQUIRED) {
- if(res.getState() == LockState.WAITING) {
- LOG.error("Unable to acquire locks for lockId={} after {} retries (retries took {} ms). QueryId={}",
- res.getLockid(), numRetries, retryDuration, queryId);
+ LOG.error("Unable to acquire locks for lockId={} after {} retries (retries took {} ms). QueryId={}\n{}",
+ res.getLockid(), numRetries, retryDuration, queryId, res);
+ if (res.getState() == LockState.WAITING) {
/**
* the {@link #unlock(HiveLock)} here is more about future proofing when support for
* multi-statement txns is added. In that case it's reasonable for the client
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index b4dac43..a08af7c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -412,6 +412,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
rqstBuilder.setTransactionId(txnId)
.setUser(username);
+ rqstBuilder.setZeroWaitReadEnabled(!conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) ||
+ !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
// Make sure we need locks. It's possible there's nothing to lock in
// this operation.
@@ -420,11 +422,10 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
return null;
}
List<LockComponent> lockComponents = AcidUtils.makeLockComponents(plan.getOutputs(), plan.getInputs(), conf);
-
lockComponents.addAll(getGlobalLocks(ctx.getConf()));
//It's possible there's nothing to lock even if we have w/r entities.
- if(lockComponents.isEmpty()) {
+ if (lockComponents.isEmpty()) {
LOG.debug("No locks needed for queryId=" + queryId);
return null;
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index f90396b..f0ab8af 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -2529,7 +2529,17 @@ public class TestDbTxnManager2 {
@Test
public void testFairness() throws Exception {
- dropTable(new String[] {"T6"});
+ testFairness(false);
+ }
+
+ @Test
+ public void testFairnessZeroWaitRead() throws Exception {
+ testFairness(true);
+ }
+
+ private void testFairness(boolean zeroWaitRead) throws Exception {
+ dropTable(new String[]{"T6"});
+ conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !zeroWaitRead);
driver.run("create table if not exists T6(a int)");
driver.compileAndRespond("select a from T6", true);
txnMgr.acquireLocks(driver.getPlan(), ctx, "Fifer"); //gets S lock on T6
@@ -2541,18 +2551,30 @@ public class TestDbTxnManager2 {
List<ShowLocksResponseElement> locks = getLocks();
Assert.assertEquals("Unexpected lock count", 2, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks);
- checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks);
+ long extLockId = checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks).getLockid();
HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
swapTxnManager(txnMgr3);
//this should block behind the X lock on T6
//this is a contrived example, in practice this query would of course fail after drop table
driver.compileAndRespond("select a from T6", true);
- ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false); //gets S lock on T6
+ try {
+ ((DbTxnManager) txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false); //gets S lock on T6
+ } catch (LockException ex) {
+ Assert.assertTrue(zeroWaitRead);
+ Assert.assertEquals("Exception msg didn't match",
+ ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg() + " LockResponse(lockid:" + (extLockId + 1) +
+ ", state:NOT_ACQUIRED, errorMessage:Unable to acquire read lock due to an exclusive lock" +
+ " {lockid:" + extLockId + " intLockId:1 txnid:" + txnMgr2.getCurrentTxnId() +
+ " db:default table:t6 partition:null state:WAITING type:EXCLUSIVE})",
+ ex.getMessage());
+ }
locks = getLocks();
- Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ Assert.assertEquals("Unexpected lock count", (zeroWaitRead ? 2 : 3), locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T6", null, locks);
- checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T6", null, locks);
+ if (!zeroWaitRead) {
+ checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T6", null, locks);
+ }
checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T6", null, locks);
}
@@ -2568,7 +2590,17 @@ public class TestDbTxnManager2 {
*/
@Test
public void testFairness2() throws Exception {
+ testFairness2(false);
+ }
+
+ @Test
+ public void testFairness2ZeroWaitRead() throws Exception {
+ testFairness2(true);
+ }
+
+ private void testFairness2(boolean zeroWaitRead) throws Exception {
dropTable(new String[]{"T7"});
+ conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !zeroWaitRead);
driver.run("create table if not exists T7 (a int) " +
"partitioned by (p int) stored as orc TBLPROPERTIES ('transactional'='true')");
driver.run("insert into T7 partition(p) values(1,1),(1,2)"); //create 2 partitions
@@ -2584,42 +2616,56 @@ public class TestDbTxnManager2 {
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks);
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks);
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks);
- checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T7", "p=1", locks);
+ long extLockId = checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T7", "p=1", locks).getLockid();
HiveTxnManager txnMgr3 = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
swapTxnManager(txnMgr3);
//this should block behind the X lock on T7.p=1
driver.compileAndRespond("select a from T7", true);
//tries to get S lock on T7, S on T7.p=1 and S on T7.p=2
- ((DbTxnManager)txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false);
+ try {
+ ((DbTxnManager) txnMgr3).acquireLocks(driver.getPlan(), ctx, "Fifer", false);
+ } catch (LockException ex) {
+ Assert.assertTrue(zeroWaitRead);
+ Assert.assertEquals("Exception msg didn't match",
+ ErrorMsg.LOCK_CANNOT_BE_ACQUIRED.getMsg() + " LockResponse(lockid:" + (extLockId + 1) +
+ ", state:NOT_ACQUIRED, errorMessage:Unable to acquire read lock due to an exclusive lock" +
+ " {lockid:" + extLockId + " intLockId:1 txnid:" + txnMgr2.getCurrentTxnId() +
+ " db:default table:t7 partition:p=1 state:WAITING type:EXCLUSIVE})",
+ ex.getMessage());
+ }
locks = getLocks();
- Assert.assertEquals("Unexpected lock count", 7, locks.size());
+ Assert.assertEquals("Unexpected lock count", (zeroWaitRead ? 4 : 7), locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks);
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks);
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks);
- checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks);
- checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks);
- checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks);
+ if (!zeroWaitRead) {
+ checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks);
+ checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks);
+ checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks);
+ }
checkLock(LockType.EXCLUSIVE, LockState.WAITING, "default", "T7", "p=1", locks);
txnMgr.commitTxn(); //release locks from "select a from T7" - to unblock hte drop partition
//retest the the "drop partiton" X lock
- ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(6).getLockid());
+ ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(zeroWaitRead ? 3 : 6).getLockid());
locks = getLocks();
- Assert.assertEquals("Unexpected lock count", 4, locks.size());
+ Assert.assertEquals("Unexpected lock count", (zeroWaitRead ? 1 : 4), locks.size());
checkLock(LockType.EXCLUSIVE, LockState.ACQUIRED, "default", "T7", "p=1", locks);
- checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks);
- checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks);
- checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks);
-
- txnMgr2.rollbackTxn(); //release the X lock on T7.p=1
- //re-test the locks
- ((DbLockManager)txnMgr2.getLockManager()).checkLock(locks.get(1).getLockid()); //S lock on T7
- locks = getLocks();
- Assert.assertEquals("Unexpected lock count", 3, locks.size());
- checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks);
- checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks);
- checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks);
+ if (!zeroWaitRead) {
+ checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", null, locks);
+ checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=1", locks);
+ checkLock(LockType.SHARED_READ, LockState.WAITING, "default", "T7", "p=2", locks);
+
+ txnMgr2.rollbackTxn(); //release the X lock on T7.p=1
+ //re-test the locks
+ ((DbLockManager) txnMgr2.getLockManager()).checkLock(locks.get(1).getLockid()); //S lock on T7
+ locks = getLocks();
+ Assert.assertEquals("Unexpected lock count", 3, locks.size());
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", null, locks);
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=1", locks);
+ checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "T7", "p=2", locks);
+ }
}
@Test
diff --git a/ql/src/test/results/clientnegative/insert_into1.q.out b/ql/src/test/results/clientnegative/insert_into1.q.out
index 066f04d..0d3439e 100644
--- a/ql/src/test/results/clientnegative/insert_into1.q.out
+++ b/ql/src/test/results/clientnegative/insert_into1.q.out
@@ -15,4 +15,4 @@ PREHOOK: type: LOCKTABLE
POSTHOOK: query: LOCK TABLE insert_into1_neg SHARED
POSTHOOK: type: LOCKTABLE
Unable to acquire IMPLICIT, EXCLUSIVE lock default@insert_into1_neg after 5 attempts.
-FAILED: Error in acquiring locks: Locks on the underlying objects cannot be acquired. retry after some time
+FAILED: Error in acquiring locks: Locks on the underlying objects cannot be acquired, retry after some time.
diff --git a/ql/src/test/results/clientnegative/insert_into2.q.out b/ql/src/test/results/clientnegative/insert_into2.q.out
index b839efe..f19d67b 100644
--- a/ql/src/test/results/clientnegative/insert_into2.q.out
+++ b/ql/src/test/results/clientnegative/insert_into2.q.out
@@ -15,4 +15,4 @@ PREHOOK: type: LOCKTABLE
POSTHOOK: query: LOCK TABLE insert_into1_neg EXCLUSIVE
POSTHOOK: type: LOCKTABLE
Unable to acquire IMPLICIT, EXCLUSIVE lock default@insert_into1_neg after 5 attempts.
-FAILED: Error in acquiring locks: Locks on the underlying objects cannot be acquired. retry after some time
+FAILED: Error in acquiring locks: Locks on the underlying objects cannot be acquired, retry after some time.
diff --git a/ql/src/test/results/clientnegative/insert_into3.q.out b/ql/src/test/results/clientnegative/insert_into3.q.out
index 91aeb5b..44be168 100644
--- a/ql/src/test/results/clientnegative/insert_into3.q.out
+++ b/ql/src/test/results/clientnegative/insert_into3.q.out
@@ -29,4 +29,4 @@ PREHOOK: type: LOCKTABLE
POSTHOOK: query: LOCK TABLE insert_into3_neg PARTITION (ds='1') SHARED
POSTHOOK: type: LOCKTABLE
Unable to acquire IMPLICIT, EXCLUSIVE lock default@insert_into3_neg@ds=1 after 5 attempts.
-FAILED: Error in acquiring locks: Locks on the underlying objects cannot be acquired. retry after some time
+FAILED: Error in acquiring locks: Locks on the underlying objects cannot be acquired, retry after some time.
diff --git a/ql/src/test/results/clientnegative/insert_into4.q.out b/ql/src/test/results/clientnegative/insert_into4.q.out
index 303fb6a..0cc02ff 100644
--- a/ql/src/test/results/clientnegative/insert_into4.q.out
+++ b/ql/src/test/results/clientnegative/insert_into4.q.out
@@ -29,4 +29,4 @@ PREHOOK: type: LOCKTABLE
POSTHOOK: query: LOCK TABLE insert_into3_neg PARTITION (ds='1') EXCLUSIVE
POSTHOOK: type: LOCKTABLE
Unable to acquire IMPLICIT, EXCLUSIVE lock default@insert_into3_neg@ds=1 after 5 attempts.
-FAILED: Error in acquiring locks: Locks on the underlying objects cannot be acquired. retry after some time
+FAILED: Error in acquiring locks: Locks on the underlying objects cannot be acquired, retry after some time.
diff --git a/ql/src/test/results/clientnegative/lockneg_try_drop_locked_db.q.out b/ql/src/test/results/clientnegative/lockneg_try_drop_locked_db.q.out
index e66965e..964b776 100644
--- a/ql/src/test/results/clientnegative/lockneg_try_drop_locked_db.q.out
+++ b/ql/src/test/results/clientnegative/lockneg_try_drop_locked_db.q.out
@@ -17,4 +17,4 @@ PREHOOK: type: SHOWLOCKS
POSTHOOK: query: show locks database lockneg9
POSTHOOK: type: SHOWLOCKS
Unable to acquire IMPLICIT, EXCLUSIVE lock lockneg9 after 1 attempts.
-FAILED: Error in acquiring locks: Locks on the underlying objects cannot be acquired. retry after some time
+FAILED: Error in acquiring locks: Locks on the underlying objects cannot be acquired, retry after some time.
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockRequest.java
index ad6cc04..06722f1 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockRequest.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockRequest.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField USER_FIELD_DESC = new org.apache.thrift.protocol.TField("user", org.apache.thrift.protocol.TType.STRING, (short)3);
private static final org.apache.thrift.protocol.TField HOSTNAME_FIELD_DESC = new org.apache.thrift.protocol.TField("hostname", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final org.apache.thrift.protocol.TField AGENT_INFO_FIELD_DESC = new org.apache.thrift.protocol.TField("agentInfo", org.apache.thrift.protocol.TType.STRING, (short)5);
+ private static final org.apache.thrift.protocol.TField ZERO_WAIT_READ_ENABLED_FIELD_DESC = new org.apache.thrift.protocol.TField("zeroWaitReadEnabled", org.apache.thrift.protocol.TType.BOOL, (short)6);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory;
private String user; // required
private String hostname; // required
private String agentInfo; // optional
+ private boolean zeroWaitReadEnabled; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -62,7 +64,8 @@ import org.slf4j.LoggerFactory;
TXNID((short)2, "txnid"),
USER((short)3, "user"),
HOSTNAME((short)4, "hostname"),
- AGENT_INFO((short)5, "agentInfo");
+ AGENT_INFO((short)5, "agentInfo"),
+ ZERO_WAIT_READ_ENABLED((short)6, "zeroWaitReadEnabled");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -87,6 +90,8 @@ import org.slf4j.LoggerFactory;
return HOSTNAME;
case 5: // AGENT_INFO
return AGENT_INFO;
+ case 6: // ZERO_WAIT_READ_ENABLED
+ return ZERO_WAIT_READ_ENABLED;
default:
return null;
}
@@ -128,8 +133,9 @@ import org.slf4j.LoggerFactory;
// isset id assignments
private static final int __TXNID_ISSET_ID = 0;
+ private static final int __ZEROWAITREADENABLED_ISSET_ID = 1;
private byte __isset_bitfield = 0;
- private static final _Fields optionals[] = {_Fields.TXNID,_Fields.AGENT_INFO};
+ private static final _Fields optionals[] = {_Fields.TXNID,_Fields.AGENT_INFO,_Fields.ZERO_WAIT_READ_ENABLED};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -144,6 +150,8 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.AGENT_INFO, new org.apache.thrift.meta_data.FieldMetaData("agentInfo", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.ZERO_WAIT_READ_ENABLED, new org.apache.thrift.meta_data.FieldMetaData("zeroWaitReadEnabled", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LockRequest.class, metaDataMap);
}
@@ -151,6 +159,8 @@ import org.slf4j.LoggerFactory;
public LockRequest() {
this.agentInfo = "Unknown";
+ this.zeroWaitReadEnabled = false;
+
}
public LockRequest(
@@ -186,6 +196,7 @@ import org.slf4j.LoggerFactory;
if (other.isSetAgentInfo()) {
this.agentInfo = other.agentInfo;
}
+ this.zeroWaitReadEnabled = other.zeroWaitReadEnabled;
}
public LockRequest deepCopy() {
@@ -201,6 +212,8 @@ import org.slf4j.LoggerFactory;
this.hostname = null;
this.agentInfo = "Unknown";
+ this.zeroWaitReadEnabled = false;
+
}
public int getComponentSize() {
@@ -332,6 +345,28 @@ import org.slf4j.LoggerFactory;
}
}
+ public boolean isZeroWaitReadEnabled() {
+ return this.zeroWaitReadEnabled;
+ }
+
+ public void setZeroWaitReadEnabled(boolean zeroWaitReadEnabled) {
+ this.zeroWaitReadEnabled = zeroWaitReadEnabled;
+ setZeroWaitReadEnabledIsSet(true);
+ }
+
+ public void unsetZeroWaitReadEnabled() {
+ __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __ZEROWAITREADENABLED_ISSET_ID);
+ }
+
+ /** Returns true if field zeroWaitReadEnabled is set (has been assigned a value) and false otherwise */
+ public boolean isSetZeroWaitReadEnabled() {
+ return EncodingUtils.testBit(__isset_bitfield, __ZEROWAITREADENABLED_ISSET_ID);
+ }
+
+ public void setZeroWaitReadEnabledIsSet(boolean value) {
+ __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __ZEROWAITREADENABLED_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case COMPONENT:
@@ -374,6 +409,14 @@ import org.slf4j.LoggerFactory;
}
break;
+ case ZERO_WAIT_READ_ENABLED:
+ if (value == null) {
+ unsetZeroWaitReadEnabled();
+ } else {
+ setZeroWaitReadEnabled((Boolean)value);
+ }
+ break;
+
}
}
@@ -394,6 +437,9 @@ import org.slf4j.LoggerFactory;
case AGENT_INFO:
return getAgentInfo();
+ case ZERO_WAIT_READ_ENABLED:
+ return isZeroWaitReadEnabled();
+
}
throw new IllegalStateException();
}
@@ -415,6 +461,8 @@ import org.slf4j.LoggerFactory;
return isSetHostname();
case AGENT_INFO:
return isSetAgentInfo();
+ case ZERO_WAIT_READ_ENABLED:
+ return isSetZeroWaitReadEnabled();
}
throw new IllegalStateException();
}
@@ -477,6 +525,15 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_zeroWaitReadEnabled = true && this.isSetZeroWaitReadEnabled();
+ boolean that_present_zeroWaitReadEnabled = true && that.isSetZeroWaitReadEnabled();
+ if (this_present_zeroWaitReadEnabled || that_present_zeroWaitReadEnabled) {
+ if (!(this_present_zeroWaitReadEnabled && that_present_zeroWaitReadEnabled))
+ return false;
+ if (this.zeroWaitReadEnabled != that.zeroWaitReadEnabled)
+ return false;
+ }
+
return true;
}
@@ -509,6 +566,11 @@ import org.slf4j.LoggerFactory;
if (present_agentInfo)
list.add(agentInfo);
+ boolean present_zeroWaitReadEnabled = true && (isSetZeroWaitReadEnabled());
+ list.add(present_zeroWaitReadEnabled);
+ if (present_zeroWaitReadEnabled)
+ list.add(zeroWaitReadEnabled);
+
return list.hashCode();
}
@@ -570,6 +632,16 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetZeroWaitReadEnabled()).compareTo(other.isSetZeroWaitReadEnabled());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetZeroWaitReadEnabled()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.zeroWaitReadEnabled, other.zeroWaitReadEnabled);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -629,6 +701,12 @@ import org.slf4j.LoggerFactory;
}
first = false;
}
+ if (isSetZeroWaitReadEnabled()) {
+ if (!first) sb.append(", ");
+ sb.append("zeroWaitReadEnabled:");
+ sb.append(this.zeroWaitReadEnabled);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -737,6 +815,14 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 6: // ZERO_WAIT_READ_ENABLED
+ if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+ struct.zeroWaitReadEnabled = iprot.readBool();
+ struct.setZeroWaitReadEnabledIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -784,6 +870,11 @@ import org.slf4j.LoggerFactory;
oprot.writeFieldEnd();
}
}
+ if (struct.isSetZeroWaitReadEnabled()) {
+ oprot.writeFieldBegin(ZERO_WAIT_READ_ENABLED_FIELD_DESC);
+ oprot.writeBool(struct.zeroWaitReadEnabled);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -817,13 +908,19 @@ import org.slf4j.LoggerFactory;
if (struct.isSetAgentInfo()) {
optionals.set(1);
}
- oprot.writeBitSet(optionals, 2);
+ if (struct.isSetZeroWaitReadEnabled()) {
+ optionals.set(2);
+ }
+ oprot.writeBitSet(optionals, 3);
if (struct.isSetTxnid()) {
oprot.writeI64(struct.txnid);
}
if (struct.isSetAgentInfo()) {
oprot.writeString(struct.agentInfo);
}
+ if (struct.isSetZeroWaitReadEnabled()) {
+ oprot.writeBool(struct.zeroWaitReadEnabled);
+ }
}
@Override
@@ -845,7 +942,7 @@ import org.slf4j.LoggerFactory;
struct.setUserIsSet(true);
struct.hostname = iprot.readString();
struct.setHostnameIsSet(true);
- BitSet incoming = iprot.readBitSet(2);
+ BitSet incoming = iprot.readBitSet(3);
if (incoming.get(0)) {
struct.txnid = iprot.readI64();
struct.setTxnidIsSet(true);
@@ -854,6 +951,10 @@ import org.slf4j.LoggerFactory;
struct.agentInfo = iprot.readString();
struct.setAgentInfoIsSet(true);
}
+ if (incoming.get(2)) {
+ struct.zeroWaitReadEnabled = iprot.readBool();
+ struct.setZeroWaitReadEnabledIsSet(true);
+ }
}
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockResponse.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockResponse.java
index fdaab4b..e0f88f8 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockResponse.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/LockResponse.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
private static final org.apache.thrift.protocol.TField LOCKID_FIELD_DESC = new org.apache.thrift.protocol.TField("lockid", org.apache.thrift.protocol.TType.I64, (short)1);
private static final org.apache.thrift.protocol.TField STATE_FIELD_DESC = new org.apache.thrift.protocol.TField("state", org.apache.thrift.protocol.TType.I32, (short)2);
+ private static final org.apache.thrift.protocol.TField ERROR_MESSAGE_FIELD_DESC = new org.apache.thrift.protocol.TField("errorMessage", org.apache.thrift.protocol.TType.STRING, (short)3);
private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
static {
@@ -49,6 +50,7 @@ import org.slf4j.LoggerFactory;
private long lockid; // required
private LockState state; // required
+ private String errorMessage; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -57,7 +59,8 @@ import org.slf4j.LoggerFactory;
*
* @see LockState
*/
- STATE((short)2, "state");
+ STATE((short)2, "state"),
+ ERROR_MESSAGE((short)3, "errorMessage");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -76,6 +79,8 @@ import org.slf4j.LoggerFactory;
return LOCKID;
case 2: // STATE
return STATE;
+ case 3: // ERROR_MESSAGE
+ return ERROR_MESSAGE;
default:
return null;
}
@@ -118,6 +123,7 @@ import org.slf4j.LoggerFactory;
// isset id assignments
private static final int __LOCKID_ISSET_ID = 0;
private byte __isset_bitfield = 0;
+ private static final _Fields optionals[] = {_Fields.ERROR_MESSAGE};
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -125,6 +131,8 @@ import org.slf4j.LoggerFactory;
new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
tmpMap.put(_Fields.STATE, new org.apache.thrift.meta_data.FieldMetaData("state", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, LockState.class)));
+ tmpMap.put(_Fields.ERROR_MESSAGE, new org.apache.thrift.meta_data.FieldMetaData("errorMessage", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(LockResponse.class, metaDataMap);
}
@@ -151,6 +159,9 @@ import org.slf4j.LoggerFactory;
if (other.isSetState()) {
this.state = other.state;
}
+ if (other.isSetErrorMessage()) {
+ this.errorMessage = other.errorMessage;
+ }
}
public LockResponse deepCopy() {
@@ -162,6 +173,7 @@ import org.slf4j.LoggerFactory;
setLockidIsSet(false);
this.lockid = 0;
this.state = null;
+ this.errorMessage = null;
}
public long getLockid() {
@@ -217,6 +229,29 @@ import org.slf4j.LoggerFactory;
}
}
+ public String getErrorMessage() {
+ return this.errorMessage;
+ }
+
+ public void setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ public void unsetErrorMessage() {
+ this.errorMessage = null;
+ }
+
+ /** Returns true if field errorMessage is set (has been assigned a value) and false otherwise */
+ public boolean isSetErrorMessage() {
+ return this.errorMessage != null;
+ }
+
+ public void setErrorMessageIsSet(boolean value) {
+ if (!value) {
+ this.errorMessage = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case LOCKID:
@@ -235,6 +270,14 @@ import org.slf4j.LoggerFactory;
}
break;
+ case ERROR_MESSAGE:
+ if (value == null) {
+ unsetErrorMessage();
+ } else {
+ setErrorMessage((String)value);
+ }
+ break;
+
}
}
@@ -246,6 +289,9 @@ import org.slf4j.LoggerFactory;
case STATE:
return getState();
+ case ERROR_MESSAGE:
+ return getErrorMessage();
+
}
throw new IllegalStateException();
}
@@ -261,6 +307,8 @@ import org.slf4j.LoggerFactory;
return isSetLockid();
case STATE:
return isSetState();
+ case ERROR_MESSAGE:
+ return isSetErrorMessage();
}
throw new IllegalStateException();
}
@@ -296,6 +344,15 @@ import org.slf4j.LoggerFactory;
return false;
}
+ boolean this_present_errorMessage = true && this.isSetErrorMessage();
+ boolean that_present_errorMessage = true && that.isSetErrorMessage();
+ if (this_present_errorMessage || that_present_errorMessage) {
+ if (!(this_present_errorMessage && that_present_errorMessage))
+ return false;
+ if (!this.errorMessage.equals(that.errorMessage))
+ return false;
+ }
+
return true;
}
@@ -313,6 +370,11 @@ import org.slf4j.LoggerFactory;
if (present_state)
list.add(state.getValue());
+ boolean present_errorMessage = true && (isSetErrorMessage());
+ list.add(present_errorMessage);
+ if (present_errorMessage)
+ list.add(errorMessage);
+
return list.hashCode();
}
@@ -344,6 +406,16 @@ import org.slf4j.LoggerFactory;
return lastComparison;
}
}
+ lastComparison = Boolean.valueOf(isSetErrorMessage()).compareTo(other.isSetErrorMessage());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetErrorMessage()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.errorMessage, other.errorMessage);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -375,6 +447,16 @@ import org.slf4j.LoggerFactory;
sb.append(this.state);
}
first = false;
+ if (isSetErrorMessage()) {
+ if (!first) sb.append(", ");
+ sb.append("errorMessage:");
+ if (this.errorMessage == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.errorMessage);
+ }
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -444,6 +526,14 @@ import org.slf4j.LoggerFactory;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 3: // ERROR_MESSAGE
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+ struct.errorMessage = iprot.readString();
+ struct.setErrorMessageIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -465,6 +555,13 @@ import org.slf4j.LoggerFactory;
oprot.writeI32(struct.state.getValue());
oprot.writeFieldEnd();
}
+ if (struct.errorMessage != null) {
+ if (struct.isSetErrorMessage()) {
+ oprot.writeFieldBegin(ERROR_MESSAGE_FIELD_DESC);
+ oprot.writeString(struct.errorMessage);
+ oprot.writeFieldEnd();
+ }
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -484,6 +581,14 @@ import org.slf4j.LoggerFactory;
TTupleProtocol oprot = (TTupleProtocol) prot;
oprot.writeI64(struct.lockid);
oprot.writeI32(struct.state.getValue());
+ BitSet optionals = new BitSet();
+ if (struct.isSetErrorMessage()) {
+ optionals.set(0);
+ }
+ oprot.writeBitSet(optionals, 1);
+ if (struct.isSetErrorMessage()) {
+ oprot.writeString(struct.errorMessage);
+ }
}
@Override
@@ -493,6 +598,11 @@ import org.slf4j.LoggerFactory;
struct.setLockidIsSet(true);
struct.state = org.apache.hadoop.hive.metastore.api.LockState.findByValue(iprot.readI32());
struct.setStateIsSet(true);
+ BitSet incoming = iprot.readBitSet(1);
+ if (incoming.get(0)) {
+ struct.errorMessage = iprot.readString();
+ struct.setErrorMessageIsSet(true);
+ }
}
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
index 92e47ed..e4b0bc7 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/Types.php
@@ -20750,6 +20750,10 @@ class LockRequest {
* @var string
*/
public $agentInfo = "Unknown";
+ /**
+ * @var bool
+ */
+ public $zeroWaitReadEnabled = false;
public function __construct($vals=null) {
if (!isset(self::$_TSPEC)) {
@@ -20779,6 +20783,10 @@ class LockRequest {
'var' => 'agentInfo',
'type' => TType::STRING,
),
+ 6 => array(
+ 'var' => 'zeroWaitReadEnabled',
+ 'type' => TType::BOOL,
+ ),
);
}
if (is_array($vals)) {
@@ -20797,6 +20805,9 @@ class LockRequest {
if (isset($vals['agentInfo'])) {
$this->agentInfo = $vals['agentInfo'];
}
+ if (isset($vals['zeroWaitReadEnabled'])) {
+ $this->zeroWaitReadEnabled = $vals['zeroWaitReadEnabled'];
+ }
}
}
@@ -20865,6 +20876,13 @@ class LockRequest {
$xfer += $input->skip($ftype);
}
break;
+ case 6:
+ if ($ftype == TType::BOOL) {
+ $xfer += $input->readBool($this->zeroWaitReadEnabled);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -20915,6 +20933,11 @@ class LockRequest {
$xfer += $output->writeString($this->agentInfo);
$xfer += $output->writeFieldEnd();
}
+ if ($this->zeroWaitReadEnabled !== null) {
+ $xfer += $output->writeFieldBegin('zeroWaitReadEnabled', TType::BOOL, 6);
+ $xfer += $output->writeBool($this->zeroWaitReadEnabled);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
@@ -20933,6 +20956,10 @@ class LockResponse {
* @var int
*/
public $state = null;
+ /**
+ * @var string
+ */
+ public $errorMessage = null;
public function __construct($vals=null) {
if (!isset(self::$_TSPEC)) {
@@ -20945,6 +20972,10 @@ class LockResponse {
'var' => 'state',
'type' => TType::I32,
),
+ 3 => array(
+ 'var' => 'errorMessage',
+ 'type' => TType::STRING,
+ ),
);
}
if (is_array($vals)) {
@@ -20954,6 +20985,9 @@ class LockResponse {
if (isset($vals['state'])) {
$this->state = $vals['state'];
}
+ if (isset($vals['errorMessage'])) {
+ $this->errorMessage = $vals['errorMessage'];
+ }
}
}
@@ -20990,6 +21024,13 @@ class LockResponse {
$xfer += $input->skip($ftype);
}
break;
+ case 3:
+ if ($ftype == TType::STRING) {
+ $xfer += $input->readString($this->errorMessage);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -21013,6 +21054,11 @@ class LockResponse {
$xfer += $output->writeI32($this->state);
$xfer += $output->writeFieldEnd();
}
+ if ($this->errorMessage !== null) {
+ $xfer += $output->writeFieldBegin('errorMessage', TType::STRING, 3);
+ $xfer += $output->writeString($this->errorMessage);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index eda7817..1a0fee3 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -14421,6 +14421,7 @@ class LockRequest:
- user
- hostname
- agentInfo
+ - zeroWaitReadEnabled
"""
thrift_spec = (
@@ -14430,14 +14431,16 @@ class LockRequest:
(3, TType.STRING, 'user', None, None, ), # 3
(4, TType.STRING, 'hostname', None, None, ), # 4
(5, TType.STRING, 'agentInfo', None, "Unknown", ), # 5
+ (6, TType.BOOL, 'zeroWaitReadEnabled', None, False, ), # 6
)
- def __init__(self, component=None, txnid=None, user=None, hostname=None, agentInfo=thrift_spec[5][4],):
+ def __init__(self, component=None, txnid=None, user=None, hostname=None, agentInfo=thrift_spec[5][4], zeroWaitReadEnabled=thrift_spec[6][4],):
self.component = component
self.txnid = txnid
self.user = user
self.hostname = hostname
self.agentInfo = agentInfo
+ self.zeroWaitReadEnabled = zeroWaitReadEnabled
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -14479,6 +14482,11 @@ class LockRequest:
self.agentInfo = iprot.readString()
else:
iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.BOOL:
+ self.zeroWaitReadEnabled = iprot.readBool()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -14512,6 +14520,10 @@ class LockRequest:
oprot.writeFieldBegin('agentInfo', TType.STRING, 5)
oprot.writeString(self.agentInfo)
oprot.writeFieldEnd()
+ if self.zeroWaitReadEnabled is not None:
+ oprot.writeFieldBegin('zeroWaitReadEnabled', TType.BOOL, 6)
+ oprot.writeBool(self.zeroWaitReadEnabled)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -14532,6 +14544,7 @@ class LockRequest:
value = (value * 31) ^ hash(self.user)
value = (value * 31) ^ hash(self.hostname)
value = (value * 31) ^ hash(self.agentInfo)
+ value = (value * 31) ^ hash(self.zeroWaitReadEnabled)
return value
def __repr__(self):
@@ -14550,17 +14563,20 @@ class LockResponse:
Attributes:
- lockid
- state
+ - errorMessage
"""
thrift_spec = (
None, # 0
(1, TType.I64, 'lockid', None, None, ), # 1
(2, TType.I32, 'state', None, None, ), # 2
+ (3, TType.STRING, 'errorMessage', None, None, ), # 3
)
- def __init__(self, lockid=None, state=None,):
+ def __init__(self, lockid=None, state=None, errorMessage=None,):
self.lockid = lockid
self.state = state
+ self.errorMessage = errorMessage
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -14581,6 +14597,11 @@ class LockResponse:
self.state = iprot.readI32()
else:
iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRING:
+ self.errorMessage = iprot.readString()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -14599,6 +14620,10 @@ class LockResponse:
oprot.writeFieldBegin('state', TType.I32, 2)
oprot.writeI32(self.state)
oprot.writeFieldEnd()
+ if self.errorMessage is not None:
+ oprot.writeFieldBegin('errorMessage', TType.STRING, 3)
+ oprot.writeString(self.errorMessage)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -14614,6 +14639,7 @@ class LockResponse:
value = 17
value = (value * 31) ^ hash(self.lockid)
value = (value * 31) ^ hash(self.state)
+ value = (value * 31) ^ hash(self.errorMessage)
return value
def __repr__(self):
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 86aa4f3..e6224ec 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -3207,13 +3207,15 @@ class LockRequest
USER = 3
HOSTNAME = 4
AGENTINFO = 5
+ ZEROWAITREADENABLED = 6
FIELDS = {
COMPONENT => {:type => ::Thrift::Types::LIST, :name => 'component', :element => {:type => ::Thrift::Types::STRUCT, :class => ::LockComponent}},
TXNID => {:type => ::Thrift::Types::I64, :name => 'txnid', :optional => true},
USER => {:type => ::Thrift::Types::STRING, :name => 'user'},
HOSTNAME => {:type => ::Thrift::Types::STRING, :name => 'hostname'},
- AGENTINFO => {:type => ::Thrift::Types::STRING, :name => 'agentInfo', :default => %q"Unknown", :optional => true}
+ AGENTINFO => {:type => ::Thrift::Types::STRING, :name => 'agentInfo', :default => %q"Unknown", :optional => true},
+ ZEROWAITREADENABLED => {:type => ::Thrift::Types::BOOL, :name => 'zeroWaitReadEnabled', :default => false, :optional => true}
}
def struct_fields; FIELDS; end
@@ -3231,10 +3233,12 @@ class LockResponse
include ::Thrift::Struct, ::Thrift::Struct_Union
LOCKID = 1
STATE = 2
+ ERRORMESSAGE = 3
FIELDS = {
LOCKID => {:type => ::Thrift::Types::I64, :name => 'lockid'},
- STATE => {:type => ::Thrift::Types::I32, :name => 'state', :enum_class => ::LockState}
+ STATE => {:type => ::Thrift::Types::I32, :name => 'state', :enum_class => ::LockState},
+ ERRORMESSAGE => {:type => ::Thrift::Types::STRING, :name => 'errorMessage', :optional => true}
}
def struct_fields; FIELDS; end
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java
index 93da0f6..b43410d 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/LockRequestBuilder.java
@@ -85,6 +85,11 @@ public class LockRequestBuilder {
return this;
}
+ public LockRequestBuilder setZeroWaitReadEnabled(boolean zeroWaitRead) {
+ req.setZeroWaitReadEnabled(zeroWaitRead);
+ return this;
+ }
+
/**
* Add a lock component to the lock request
* @param component to add
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index c21923e..8462b3d 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1080,11 +1080,13 @@ struct LockRequest {
3: required string user, // used in 'show locks' to help admins find who has open locks
4: required string hostname, // used in 'show locks' to help admins find who has open locks
5: optional string agentInfo = "Unknown",
+ 6: optional bool zeroWaitReadEnabled = false
}
struct LockResponse {
1: required i64 lockid,
2: required LockState state,
+ 3: optional string errorMessage
}
struct CheckLockRequest {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index fe39b0b..10a02b1 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -2328,7 +2328,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
public LockResponse lock(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException {
ConnectionLockIdPair connAndLockId = enqueueLockWithRetry(rqst);
try {
- return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid());
+ return checkLockWithRetry(connAndLockId.dbConn, connAndLockId.extLockId, rqst.getTxnid(),
+ rqst.isZeroWaitReadEnabled());
}
catch(NoSuchLockException e) {
// This should never happen, as we just added the lock id
@@ -2634,7 +2635,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
return s == null ? null : s.toLowerCase();
}
- private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId)
+ private LockResponse checkLockWithRetry(Connection dbConn, long extLockId, long txnId, boolean zeroWaitReadEnabled)
throws NoSuchLockException, TxnAbortedException, MetaException {
try {
try {
@@ -2643,7 +2644,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
//should only get here if retrying this op
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
}
- return checkLock(dbConn, extLockId, txnId);
+ return checkLock(dbConn, extLockId, txnId, zeroWaitReadEnabled);
} catch (SQLException e) {
LOG.error("checkLock failed for extLockId={}/txnId={}. Exception msg: {}", extLockId, txnId, getMessage(e));
rollbackDBConn(dbConn);
@@ -2658,7 +2659,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
catch(RetryException e) {
LOG.debug("Going to retry checkLock for extLockId={}/txnId={} after catching RetryException with message: {}",
extLockId, txnId, e.getMessage());
- return checkLockWithRetry(dbConn, extLockId, txnId);
+ return checkLockWithRetry(dbConn, extLockId, txnId, zeroWaitReadEnabled);
}
}
/**
@@ -2703,7 +2704,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
//todo: strictly speaking there is a bug here. heartbeat*() commits but both heartbeat and
//checkLock() are in the same retry block, so if checkLock() throws, heartbeat is also retired
//extra heartbeat is logically harmless, but ...
- return checkLock(dbConn, extLockId, lockInfo.txnId);
+ return checkLock(dbConn, extLockId, lockInfo.txnId, false);
} catch (SQLException e) {
LOG.error("checkLock failed for request={}. Exception msg: {}", rqst, getMessage(e));
rollbackDBConn(dbConn);
@@ -4250,7 +4251,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* checkLock() will in the worst case keep locks in Waiting state a little longer.
*/
@RetrySemantics.SafeToRetry("See @SafeToRetry")
- private LockResponse checkLock(Connection dbConn, long extLockId, long txnId)
+ private LockResponse checkLock(Connection dbConn, long extLockId, long txnId, boolean zeroWaitReadEnabled)
throws NoSuchLockException, TxnAbortedException, MetaException, SQLException {
Statement stmt = null;
ResultSet rs = null;
@@ -4332,7 +4333,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
String queryStr =
- " \"EX\".*, \"REQ\".\"HL_LOCK_INT_ID\" AS \"REQ_LOCK_INT_ID\" FROM (" +
+ " \"EX\".*, \"REQ\".\"HL_LOCK_INT_ID\" \"LOCK_INT_ID\", \"REQ\".\"HL_LOCK_TYPE\" \"LOCK_TYPE\" FROM (" +
" SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\"," +
" \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\" FROM \"HIVE_LOCKS\"" +
" WHERE \"HL_LOCK_EXT_ID\" < " + extLockId + ") \"EX\"" +
@@ -4357,6 +4358,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
is performed on that db (e.g. show tables, created table, etc).
EXCLUSIVE on an object may mean it's being dropped or overwritten.*/
String[] whereStr = {
+ // shared-read
+ " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.sharedRead() + " AND \"EX\".\"HL_LOCK_TYPE\"=" +
+ LockTypeUtil.exclusive() + " AND NOT (\"EX\".\"HL_TABLE\" IS NOT NULL AND \"REQ\".\"HL_TABLE\" IS NULL)",
// exclusive
" \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.exclusive() +
" AND NOT (\"EX\".\"HL_TABLE\" IS NULL AND \"EX\".\"HL_LOCK_TYPE\"=" +
@@ -4366,10 +4370,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
LockTypeUtil.exclWrite() + "," + LockTypeUtil.exclusive() + ")",
// excl-write
" \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.exclWrite() + " AND \"EX\".\"HL_LOCK_TYPE\"!=" +
- LockTypeUtil.sharedRead(),
- // shared-read
- " \"REQ\".\"HL_LOCK_TYPE\"=" + LockTypeUtil.sharedRead() + " AND \"EX\".\"HL_LOCK_TYPE\"=" +
- LockTypeUtil.exclusive() + " AND NOT (\"EX\".\"HL_TABLE\" IS NOT NULL AND \"REQ\".\"HL_TABLE\" IS NULL)"
+ LockTypeUtil.sharedRead()
};
List<String> subQuery = new ArrayList<>();
@@ -4385,21 +4386,40 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (rs.next()) {
// We acquire all locks for a given query atomically; if 1 blocks, all remain in Waiting state.
LockInfo blockedBy = new LockInfo(rs);
- long intLockId = rs.getLong("REQ_LOCK_INT_ID");
- LOG.debug("Failure to acquire lock({} intLockId:{} {}), blocked by ({}})", JavaUtils.lockIdToString(extLockId),
- intLockId, JavaUtils.txnIdToString(txnId), blockedBy);
+ long intLockId = rs.getLong("LOCK_INT_ID");
+ char lockChar = rs.getString("LOCK_TYPE").charAt(0);
+ LOG.debug("Failure to acquire lock({} intLockId:{} {}), blocked by ({})", JavaUtils.lockIdToString(extLockId),
+ intLockId, JavaUtils.txnIdToString(txnId), blockedBy);
+
+ if (zeroWaitReadEnabled && isValidTxn(txnId)) {
+ LockType lockType = LockTypeUtil.getLockTypeFromEncoding(lockChar)
+ .orElseThrow(() -> new MetaException("Unknown lock type: " + lockChar));
+
+ if (lockType == LockType.SHARED_READ) {
+ String cleanupQuery = "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = " + extLockId;
+
+ LOG.debug("Going to execute query: <" + cleanupQuery + ">");
+ stmt.executeUpdate(cleanupQuery);
+ dbConn.commit();
+
+ response.setErrorMessage(String.format(
+ "Unable to acquire read lock due to an exclusive lock {%s}", blockedBy));
+ response.setState(LockState.NOT_ACQUIRED);
+ return response;
+ }
+ }
String updateBlockedByQuery = "UPDATE \"HIVE_LOCKS\"" +
- " SET \"HL_BLOCKEDBY_EXT_ID\" = " + blockedBy.extLockId +
- ", \"HL_BLOCKEDBY_INT_ID\" = " + blockedBy.intLockId +
- " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND \"HL_LOCK_INT_ID\" = " + intLockId;
+ " SET \"HL_BLOCKEDBY_EXT_ID\" = " + blockedBy.extLockId +
+ ", \"HL_BLOCKEDBY_INT_ID\" = " + blockedBy.intLockId +
+ " WHERE \"HL_LOCK_EXT_ID\" = " + extLockId + " AND \"HL_LOCK_INT_ID\" = " + intLockId;
LOG.debug("Going to execute query: <" + updateBlockedByQuery + ">");
int updCnt = stmt.executeUpdate(updateBlockedByQuery);
if (updCnt != 1) {
- LOG.error("Failure to update blocked lock (extLockId={}, intLockId={}) with the blocking lock's IDs (extLockId={}, intLockId={})",
- extLockId, intLockId, blockedBy.extLockId, blockedBy.intLockId);
+ LOG.error("Failure to update lock (extLockId={}, intLockId={}) with the blocking lock's IDs " +
+ "(extLockId={}, intLockId={})", extLockId, intLockId, blockedBy.extLockId, blockedBy.intLockId);
shouldNeverHappen(txnId, extLockId, intLockId);
}
dbConn.commit();