You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kr...@apache.org on 2022/08/23 10:32:03 UTC
[hive] branch master updated: HIVE-26472: Allocate new writeIds during re-compilation (John Sherman, reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 5de6faa8c94 HIVE-26472: Allocate new writeIds during re-compilation (John Sherman, reviewed by Denys Kuzmenko)
5de6faa8c94 is described below
commit 5de6faa8c94cabf7075b078072cb778265b7caef
Author: John Sherman <jf...@cloudera.com>
AuthorDate: Tue Aug 23 03:31:52 2022 -0700
HIVE-26472: Allocate new writeIds during re-compilation (John Sherman, reviewed by Denys Kuzmenko)
---
ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 15 +++
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 24 +++--
.../hadoop/hive/ql/lockmgr/HiveTxnManager.java | 8 ++
.../hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java | 5 +
.../gen/thrift/gen-cpp/hive_metastore_types.cpp | 22 ++++
.../src/gen/thrift/gen-cpp/hive_metastore_types.h | 13 ++-
.../api/AllocateTableWriteIdsRequest.java | 112 ++++++++++++++++++++-
.../metastore/AllocateTableWriteIdsRequest.php | 24 +++++
.../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 14 ++-
.../src/gen/thrift/gen-rb/hive_metastore_types.rb | 4 +-
.../hadoop/hive/metastore/HiveMetaStoreClient.java | 31 ++++--
.../hadoop/hive/metastore/IMetaStoreClient.java | 10 ++
.../src/main/thrift/hive_metastore.thrift | 3 +
.../hadoop/hive/metastore/txn/TxnHandler.java | 64 +++++++-----
.../metastore/HiveMetaStoreClientPreCatalog.java | 12 ++-
15 files changed, 312 insertions(+), 49 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 3565763038a..d494f409ad7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -262,6 +262,21 @@ public class Driver implements IDriver {
String userFromUGI = DriverUtils.getUserFromUGI(driverContext);
driverContext.getTxnManager().openTxn(context, userFromUGI, driverContext.getTxnType());
lockAndRespond();
+ } else {
+ // We need to clear the possibly cached writeIds for the prior transaction, so new writeIds
+ // are allocated since writeIds need to be committed in increasing order. It helps in cases
+ // like:
+ // txnId writeId
+ // 10 71 <--- commit first
+ // 11 69
+ // 12 70
+ // in which the transaction is not out of date, but the writeId would not be increasing.
+ // This would be a problem in an UPDATE, since it would end up generating delete
+ // deltas for a future writeId - which in turn causes scans to not think they are deleted.
+ // The scan basically does last writer wins for a given row which is determined by
+ // max(committingWriteId) for a given ROW__ID(originalWriteId, bucketId, rowId). So the
+ // data add ends up being > than the data delete.
+ driverContext.getTxnManager().clearCaches();
}
driverContext.setRetrial(true);
driverContext.getBackupContext().addSubContext(context);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 40ff171db22..de8b3a45df4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -110,6 +110,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
* The local cache of table write IDs allocated/created by the current transaction
*/
private Map<String, Long> tableWriteIds = new HashMap<>();
+ private boolean shouldReallocateWriteIds = false;
/**
* assigns a unique monotonically increasing ID to each statement
@@ -229,6 +230,13 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
return openTxn(ctx, user, txnType, 0);
}
+ @Override
+ public void clearCaches() {
+ LOG.info("Clearing writeId cache for {}", JavaUtils.txnIdToString(txnId));
+ tableWriteIds.clear();
+ shouldReallocateWriteIds = true;
+ }
+
@VisibleForTesting
long openTxn(Context ctx, String user, TxnType txnType, long delay) throws LockException {
/*Q: why don't we lock the snapshot here??? Instead of having client make an explicit call
@@ -251,6 +259,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
stmtId = 0;
numStatements = 0;
tableWriteIds.clear();
+ shouldReallocateWriteIds = false;
isExplicitTransaction = false;
startTransactionCount = 0;
this.queryId = ctx.getConf().get(HiveConf.ConfVars.HIVEQUERYID.varname);
@@ -491,6 +500,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
stmtId = -1;
numStatements = 0;
tableWriteIds.clear();
+ shouldReallocateWriteIds = false;
queryId = null;
replPolicy = null;
}
@@ -933,10 +943,11 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
assert isTxnOpen();
return stmtId;
}
+
@Override
public long getTableWriteId(String dbName, String tableName) throws LockException {
assert isTxnOpen();
- return getTableWriteId(dbName, tableName, true);
+ return getTableWriteId(dbName, tableName, true, shouldReallocateWriteIds);
}
@Override
@@ -946,7 +957,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
// to return 0 if the dbName:tableName's writeId is yet allocated.
// This happens when the current context is before
// Driver.acquireLocks() is called.
- return getTableWriteId(dbName, tableName, false);
+ return getTableWriteId(dbName, tableName, false, false);
}
@Override
@@ -959,8 +970,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
}
}
- private long getTableWriteId(
- String dbName, String tableName, boolean allocateIfNotYet) throws LockException {
+ private long getTableWriteId(String dbName, String tableName, boolean allocateIfNotYet,
+ boolean shouldReallocate) throws LockException {
String fullTableName = AcidUtils.getFullTableName(dbName, tableName);
if (tableWriteIds.containsKey(fullTableName)) {
return tableWriteIds.get(fullTableName);
@@ -968,8 +979,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
return 0;
}
try {
- long writeId = getMS().allocateTableWriteId(txnId, dbName, tableName);
- LOG.debug("Allocated write ID {} for {}.{}", writeId, dbName, tableName);
+ long writeId = getMS().allocateTableWriteId(txnId, dbName, tableName, shouldReallocate);
+ LOG.info("Allocated write ID {} for {}.{} and {} (shouldReallocate: {}) ", writeId, dbName,
+ tableName, JavaUtils.txnIdToString(txnId), shouldReallocate);
tableWriteIds.put(fullTableName, writeId);
return writeId;
} catch (TException e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 24deac59281..b114414e9c7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -347,6 +347,14 @@ public interface HiveTxnManager {
// Can be used by operation to set the stmt id when allocation is done somewhere else.
int getCurrentStmtId();
+ /**
+ * Reset locally cached information.
+ * This is called before re-compilation after aquiring lock if the transaction is not
+ * outdated. The intent is to clear any cached information such as WriteIds (but not
+ * reseting/rolling back the overall transaction).
+ */
+ void clearCaches();
+
/**
* Acquire the materialization rebuild lock for a given view. We need to specify the fully
* qualified name of the materialized view and the open transaction ID so we can identify
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
index 9d5f9aa4107..9897795db9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
@@ -71,6 +71,11 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager, Configurable {
destruct();
}
+ @Override
+ public void clearCaches() {
+ // no op - implementations should override as needed
+ }
+
@Override
public void acquireLocks(QueryPlan plan, Context ctx, String username, DriverState driverState) throws LockException {
acquireLocks(plan, ctx, username);
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index e999500532a..c031ccb561b 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -23849,6 +23849,11 @@ void AllocateTableWriteIdsRequest::__set_srcTxnToWriteIdList(const std::vector<T
this->srcTxnToWriteIdList = val;
__isset.srcTxnToWriteIdList = true;
}
+
+void AllocateTableWriteIdsRequest::__set_reallocate(const bool val) {
+ this->reallocate = val;
+__isset.reallocate = true;
+}
std::ostream& operator<<(std::ostream& out, const AllocateTableWriteIdsRequest& obj)
{
obj.printTo(out);
@@ -23943,6 +23948,14 @@ uint32_t AllocateTableWriteIdsRequest::read(::apache::thrift::protocol::TProtoco
xfer += iprot->skip(ftype);
}
break;
+ case 6:
+ if (ftype == ::apache::thrift::protocol::T_BOOL) {
+ xfer += iprot->readBool(this->reallocate);
+ this->__isset.reallocate = true;
+ } else {
+ xfer += iprot->skip(ftype);
+ }
+ break;
default:
xfer += iprot->skip(ftype);
break;
@@ -24003,6 +24016,11 @@ uint32_t AllocateTableWriteIdsRequest::write(::apache::thrift::protocol::TProtoc
}
xfer += oprot->writeFieldEnd();
}
+ if (this->__isset.reallocate) {
+ xfer += oprot->writeFieldBegin("reallocate", ::apache::thrift::protocol::T_BOOL, 6);
+ xfer += oprot->writeBool(this->reallocate);
+ xfer += oprot->writeFieldEnd();
+ }
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
@@ -24015,6 +24033,7 @@ void swap(AllocateTableWriteIdsRequest &a, AllocateTableWriteIdsRequest &b) {
swap(a.txnIds, b.txnIds);
swap(a.replPolicy, b.replPolicy);
swap(a.srcTxnToWriteIdList, b.srcTxnToWriteIdList);
+ swap(a.reallocate, b.reallocate);
swap(a.__isset, b.__isset);
}
@@ -24024,6 +24043,7 @@ AllocateTableWriteIdsRequest::AllocateTableWriteIdsRequest(const AllocateTableWr
txnIds = other897.txnIds;
replPolicy = other897.replPolicy;
srcTxnToWriteIdList = other897.srcTxnToWriteIdList;
+ reallocate = other897.reallocate;
__isset = other897.__isset;
}
AllocateTableWriteIdsRequest& AllocateTableWriteIdsRequest::operator=(const AllocateTableWriteIdsRequest& other898) {
@@ -24032,6 +24052,7 @@ AllocateTableWriteIdsRequest& AllocateTableWriteIdsRequest::operator=(const Allo
txnIds = other898.txnIds;
replPolicy = other898.replPolicy;
srcTxnToWriteIdList = other898.srcTxnToWriteIdList;
+ reallocate = other898.reallocate;
__isset = other898.__isset;
return *this;
}
@@ -24043,6 +24064,7 @@ void AllocateTableWriteIdsRequest::printTo(std::ostream& out) const {
out << ", " << "txnIds="; (__isset.txnIds ? (out << to_string(txnIds)) : (out << "<null>"));
out << ", " << "replPolicy="; (__isset.replPolicy ? (out << to_string(replPolicy)) : (out << "<null>"));
out << ", " << "srcTxnToWriteIdList="; (__isset.srcTxnToWriteIdList ? (out << to_string(srcTxnToWriteIdList)) : (out << "<null>"));
+ out << ", " << "reallocate="; (__isset.reallocate ? (out << to_string(reallocate)) : (out << "<null>"));
out << ")";
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index e5b1b3512be..152b99a4e0b 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -9489,10 +9489,11 @@ void swap(TxnToWriteId &a, TxnToWriteId &b);
std::ostream& operator<<(std::ostream& out, const TxnToWriteId& obj);
typedef struct _AllocateTableWriteIdsRequest__isset {
- _AllocateTableWriteIdsRequest__isset() : txnIds(false), replPolicy(false), srcTxnToWriteIdList(false) {}
+ _AllocateTableWriteIdsRequest__isset() : txnIds(false), replPolicy(false), srcTxnToWriteIdList(false), reallocate(true) {}
bool txnIds :1;
bool replPolicy :1;
bool srcTxnToWriteIdList :1;
+ bool reallocate :1;
} _AllocateTableWriteIdsRequest__isset;
class AllocateTableWriteIdsRequest : public virtual ::apache::thrift::TBase {
@@ -9503,7 +9504,8 @@ class AllocateTableWriteIdsRequest : public virtual ::apache::thrift::TBase {
AllocateTableWriteIdsRequest() noexcept
: dbName(),
tableName(),
- replPolicy() {
+ replPolicy(),
+ reallocate(false) {
}
virtual ~AllocateTableWriteIdsRequest() noexcept;
@@ -9512,6 +9514,7 @@ class AllocateTableWriteIdsRequest : public virtual ::apache::thrift::TBase {
std::vector<int64_t> txnIds;
std::string replPolicy;
std::vector<TxnToWriteId> srcTxnToWriteIdList;
+ bool reallocate;
_AllocateTableWriteIdsRequest__isset __isset;
@@ -9525,6 +9528,8 @@ class AllocateTableWriteIdsRequest : public virtual ::apache::thrift::TBase {
void __set_srcTxnToWriteIdList(const std::vector<TxnToWriteId> & val);
+ void __set_reallocate(const bool val);
+
bool operator == (const AllocateTableWriteIdsRequest & rhs) const
{
if (!(dbName == rhs.dbName))
@@ -9543,6 +9548,10 @@ class AllocateTableWriteIdsRequest : public virtual ::apache::thrift::TBase {
return false;
else if (__isset.srcTxnToWriteIdList && !(srcTxnToWriteIdList == rhs.srcTxnToWriteIdList))
return false;
+ if (__isset.reallocate != rhs.__isset.reallocate)
+ return false;
+ else if (__isset.reallocate && !(reallocate == rhs.reallocate))
+ return false;
return true;
}
bool operator != (const AllocateTableWriteIdsRequest &rhs) const {
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AllocateTableWriteIdsRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AllocateTableWriteIdsRequest.java
index 58b31cce26b..b94c793c7ff 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AllocateTableWriteIdsRequest.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AllocateTableWriteIdsRequest.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.metastore.api;
private static final org.apache.thrift.protocol.TField TXN_IDS_FIELD_DESC = new org.apache.thrift.protocol.TField("txnIds", org.apache.thrift.protocol.TType.LIST, (short)3);
private static final org.apache.thrift.protocol.TField REPL_POLICY_FIELD_DESC = new org.apache.thrift.protocol.TField("replPolicy", org.apache.thrift.protocol.TType.STRING, (short)4);
private static final org.apache.thrift.protocol.TField SRC_TXN_TO_WRITE_ID_LIST_FIELD_DESC = new org.apache.thrift.protocol.TField("srcTxnToWriteIdList", org.apache.thrift.protocol.TType.LIST, (short)5);
+ private static final org.apache.thrift.protocol.TField REALLOCATE_FIELD_DESC = new org.apache.thrift.protocol.TField("reallocate", org.apache.thrift.protocol.TType.BOOL, (short)6);
private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new AllocateTableWriteIdsRequestStandardSchemeFactory();
private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new AllocateTableWriteIdsRequestTupleSchemeFactory();
@@ -25,6 +26,7 @@ package org.apache.hadoop.hive.metastore.api;
private @org.apache.thrift.annotation.Nullable java.util.List<java.lang.Long> txnIds; // optional
private @org.apache.thrift.annotation.Nullable java.lang.String replPolicy; // optional
private @org.apache.thrift.annotation.Nullable java.util.List<TxnToWriteId> srcTxnToWriteIdList; // optional
+ private boolean reallocate; // optional
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -32,7 +34,8 @@ package org.apache.hadoop.hive.metastore.api;
TABLE_NAME((short)2, "tableName"),
TXN_IDS((short)3, "txnIds"),
REPL_POLICY((short)4, "replPolicy"),
- SRC_TXN_TO_WRITE_ID_LIST((short)5, "srcTxnToWriteIdList");
+ SRC_TXN_TO_WRITE_ID_LIST((short)5, "srcTxnToWriteIdList"),
+ REALLOCATE((short)6, "reallocate");
private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
@@ -58,6 +61,8 @@ package org.apache.hadoop.hive.metastore.api;
return REPL_POLICY;
case 5: // SRC_TXN_TO_WRITE_ID_LIST
return SRC_TXN_TO_WRITE_ID_LIST;
+ case 6: // REALLOCATE
+ return REALLOCATE;
default:
return null;
}
@@ -99,7 +104,9 @@ package org.apache.hadoop.hive.metastore.api;
}
// isset id assignments
- private static final _Fields optionals[] = {_Fields.TXN_IDS,_Fields.REPL_POLICY,_Fields.SRC_TXN_TO_WRITE_ID_LIST};
+ private static final int __REALLOCATE_ISSET_ID = 0;
+ private byte __isset_bitfield = 0;
+ private static final _Fields optionals[] = {_Fields.TXN_IDS,_Fields.REPL_POLICY,_Fields.SRC_TXN_TO_WRITE_ID_LIST,_Fields.REALLOCATE};
public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -115,11 +122,15 @@ package org.apache.hadoop.hive.metastore.api;
tmpMap.put(_Fields.SRC_TXN_TO_WRITE_ID_LIST, new org.apache.thrift.meta_data.FieldMetaData("srcTxnToWriteIdList", org.apache.thrift.TFieldRequirementType.OPTIONAL,
new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TxnToWriteId.class))));
+ tmpMap.put(_Fields.REALLOCATE, new org.apache.thrift.meta_data.FieldMetaData("reallocate", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(AllocateTableWriteIdsRequest.class, metaDataMap);
}
public AllocateTableWriteIdsRequest() {
+ this.reallocate = false;
+
}
public AllocateTableWriteIdsRequest(
@@ -135,6 +146,7 @@ package org.apache.hadoop.hive.metastore.api;
* Performs a deep copy on <i>other</i>.
*/
public AllocateTableWriteIdsRequest(AllocateTableWriteIdsRequest other) {
+ __isset_bitfield = other.__isset_bitfield;
if (other.isSetDbName()) {
this.dbName = other.dbName;
}
@@ -155,6 +167,7 @@ package org.apache.hadoop.hive.metastore.api;
}
this.srcTxnToWriteIdList = __this__srcTxnToWriteIdList;
}
+ this.reallocate = other.reallocate;
}
public AllocateTableWriteIdsRequest deepCopy() {
@@ -168,6 +181,8 @@ package org.apache.hadoop.hive.metastore.api;
this.txnIds = null;
this.replPolicy = null;
this.srcTxnToWriteIdList = null;
+ this.reallocate = false;
+
}
@org.apache.thrift.annotation.Nullable
@@ -322,6 +337,28 @@ package org.apache.hadoop.hive.metastore.api;
}
}
+ public boolean isReallocate() {
+ return this.reallocate;
+ }
+
+ public void setReallocate(boolean reallocate) {
+ this.reallocate = reallocate;
+ setReallocateIsSet(true);
+ }
+
+ public void unsetReallocate() {
+ __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __REALLOCATE_ISSET_ID);
+ }
+
+ /** Returns true if field reallocate is set (has been assigned a value) and false otherwise */
+ public boolean isSetReallocate() {
+ return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __REALLOCATE_ISSET_ID);
+ }
+
+ public void setReallocateIsSet(boolean value) {
+ __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __REALLOCATE_ISSET_ID, value);
+ }
+
public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
switch (field) {
case DB_NAME:
@@ -364,6 +401,14 @@ package org.apache.hadoop.hive.metastore.api;
}
break;
+ case REALLOCATE:
+ if (value == null) {
+ unsetReallocate();
+ } else {
+ setReallocate((java.lang.Boolean)value);
+ }
+ break;
+
}
}
@@ -385,6 +430,9 @@ package org.apache.hadoop.hive.metastore.api;
case SRC_TXN_TO_WRITE_ID_LIST:
return getSrcTxnToWriteIdList();
+ case REALLOCATE:
+ return isReallocate();
+
}
throw new java.lang.IllegalStateException();
}
@@ -406,6 +454,8 @@ package org.apache.hadoop.hive.metastore.api;
return isSetReplPolicy();
case SRC_TXN_TO_WRITE_ID_LIST:
return isSetSrcTxnToWriteIdList();
+ case REALLOCATE:
+ return isSetReallocate();
}
throw new java.lang.IllegalStateException();
}
@@ -468,6 +518,15 @@ package org.apache.hadoop.hive.metastore.api;
return false;
}
+ boolean this_present_reallocate = true && this.isSetReallocate();
+ boolean that_present_reallocate = true && that.isSetReallocate();
+ if (this_present_reallocate || that_present_reallocate) {
+ if (!(this_present_reallocate && that_present_reallocate))
+ return false;
+ if (this.reallocate != that.reallocate)
+ return false;
+ }
+
return true;
}
@@ -495,6 +554,10 @@ package org.apache.hadoop.hive.metastore.api;
if (isSetSrcTxnToWriteIdList())
hashCode = hashCode * 8191 + srcTxnToWriteIdList.hashCode();
+ hashCode = hashCode * 8191 + ((isSetReallocate()) ? 131071 : 524287);
+ if (isSetReallocate())
+ hashCode = hashCode * 8191 + ((reallocate) ? 131071 : 524287);
+
return hashCode;
}
@@ -556,6 +619,16 @@ package org.apache.hadoop.hive.metastore.api;
return lastComparison;
}
}
+ lastComparison = java.lang.Boolean.compare(isSetReallocate(), other.isSetReallocate());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetReallocate()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.reallocate, other.reallocate);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -622,6 +695,12 @@ package org.apache.hadoop.hive.metastore.api;
}
first = false;
}
+ if (isSetReallocate()) {
+ if (!first) sb.append(", ");
+ sb.append("reallocate:");
+ sb.append(this.reallocate);
+ first = false;
+ }
sb.append(")");
return sb.toString();
}
@@ -649,6 +728,8 @@ package org.apache.hadoop.hive.metastore.api;
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bitfield = 0;
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
@@ -734,6 +815,14 @@ package org.apache.hadoop.hive.metastore.api;
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
break;
+ case 6: // REALLOCATE
+ if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+ struct.reallocate = iprot.readBool();
+ struct.setReallocateIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
}
@@ -792,6 +881,11 @@ package org.apache.hadoop.hive.metastore.api;
oprot.writeFieldEnd();
}
}
+ if (struct.isSetReallocate()) {
+ oprot.writeFieldBegin(REALLOCATE_FIELD_DESC);
+ oprot.writeBool(struct.reallocate);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -821,7 +915,10 @@ package org.apache.hadoop.hive.metastore.api;
if (struct.isSetSrcTxnToWriteIdList()) {
optionals.set(2);
}
- oprot.writeBitSet(optionals, 3);
+ if (struct.isSetReallocate()) {
+ optionals.set(3);
+ }
+ oprot.writeBitSet(optionals, 4);
if (struct.isSetTxnIds()) {
{
oprot.writeI32(struct.txnIds.size());
@@ -843,6 +940,9 @@ package org.apache.hadoop.hive.metastore.api;
}
}
}
+ if (struct.isSetReallocate()) {
+ oprot.writeBool(struct.reallocate);
+ }
}
@Override
@@ -852,7 +952,7 @@ package org.apache.hadoop.hive.metastore.api;
struct.setDbNameIsSet(true);
struct.tableName = iprot.readString();
struct.setTableNameIsSet(true);
- java.util.BitSet incoming = iprot.readBitSet(3);
+ java.util.BitSet incoming = iprot.readBitSet(4);
if (incoming.get(0)) {
{
org.apache.thrift.protocol.TList _list808 = iprot.readListBegin(org.apache.thrift.protocol.TType.I64);
@@ -884,6 +984,10 @@ package org.apache.hadoop.hive.metastore.api;
}
struct.setSrcTxnToWriteIdListIsSet(true);
}
+ if (incoming.get(3)) {
+ struct.reallocate = iprot.readBool();
+ struct.setReallocateIsSet(true);
+ }
}
}
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/AllocateTableWriteIdsRequest.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/AllocateTableWriteIdsRequest.php
index af402a9afff..1d43e8d3f9c 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/AllocateTableWriteIdsRequest.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/AllocateTableWriteIdsRequest.php
@@ -55,6 +55,11 @@ class AllocateTableWriteIdsRequest
'class' => '\metastore\TxnToWriteId',
),
),
+ 6 => array(
+ 'var' => 'reallocate',
+ 'isRequired' => false,
+ 'type' => TType::BOOL,
+ ),
);
/**
@@ -77,6 +82,10 @@ class AllocateTableWriteIdsRequest
* @var \metastore\TxnToWriteId[]
*/
public $srcTxnToWriteIdList = null;
+ /**
+ * @var bool
+ */
+ public $reallocate = false;
public function __construct($vals = null)
{
@@ -96,6 +105,9 @@ class AllocateTableWriteIdsRequest
if (isset($vals['srcTxnToWriteIdList'])) {
$this->srcTxnToWriteIdList = $vals['srcTxnToWriteIdList'];
}
+ if (isset($vals['reallocate'])) {
+ $this->reallocate = $vals['reallocate'];
+ }
}
}
@@ -172,6 +184,13 @@ class AllocateTableWriteIdsRequest
$xfer += $input->skip($ftype);
}
break;
+ case 6:
+ if ($ftype == TType::BOOL) {
+ $xfer += $input->readBool($this->reallocate);
+ } else {
+ $xfer += $input->skip($ftype);
+ }
+ break;
default:
$xfer += $input->skip($ftype);
break;
@@ -225,6 +244,11 @@ class AllocateTableWriteIdsRequest
$output->writeListEnd();
$xfer += $output->writeFieldEnd();
}
+ if ($this->reallocate !== null) {
+ $xfer += $output->writeFieldBegin('reallocate', TType::BOOL, 6);
+ $xfer += $output->writeBool($this->reallocate);
+ $xfer += $output->writeFieldEnd();
+ }
$xfer += $output->writeFieldStop();
$xfer += $output->writeStructEnd();
return $xfer;
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 80b02ec1437..450de49e2e8 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -13651,16 +13651,18 @@ class AllocateTableWriteIdsRequest(object):
- txnIds
- replPolicy
- srcTxnToWriteIdList
+ - reallocate
"""
- def __init__(self, dbName=None, tableName=None, txnIds=None, replPolicy=None, srcTxnToWriteIdList=None,):
+ def __init__(self, dbName=None, tableName=None, txnIds=None, replPolicy=None, srcTxnToWriteIdList=None, reallocate=False,):
self.dbName = dbName
self.tableName = tableName
self.txnIds = txnIds
self.replPolicy = replPolicy
self.srcTxnToWriteIdList = srcTxnToWriteIdList
+ self.reallocate = reallocate
def read(self, iprot):
if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -13707,6 +13709,11 @@ class AllocateTableWriteIdsRequest(object):
iprot.readListEnd()
else:
iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.BOOL:
+ self.reallocate = iprot.readBool()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -13743,6 +13750,10 @@ class AllocateTableWriteIdsRequest(object):
iter715.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
+ if self.reallocate is not None:
+ oprot.writeFieldBegin('reallocate', TType.BOOL, 6)
+ oprot.writeBool(self.reallocate)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -30797,6 +30808,7 @@ AllocateTableWriteIdsRequest.thrift_spec = (
(3, TType.LIST, 'txnIds', (TType.I64, None, False), None, ), # 3
(4, TType.STRING, 'replPolicy', 'UTF8', None, ), # 4
(5, TType.LIST, 'srcTxnToWriteIdList', (TType.STRUCT, [TxnToWriteId, None], False), None, ), # 5
+ (6, TType.BOOL, 'reallocate', None, False, ), # 6
)
all_structs.append(AllocateTableWriteIdsResponse)
AllocateTableWriteIdsResponse.thrift_spec = (
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 065e2642dc4..929b8cd7147 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -4034,13 +4034,15 @@ class AllocateTableWriteIdsRequest
TXNIDS = 3
REPLPOLICY = 4
SRCTXNTOWRITEIDLIST = 5
+ REALLOCATE = 6
FIELDS = {
DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'},
TXNIDS => {:type => ::Thrift::Types::LIST, :name => 'txnIds', :element => {:type => ::Thrift::Types::I64}, :optional => true},
REPLPOLICY => {:type => ::Thrift::Types::STRING, :name => 'replPolicy', :optional => true},
- SRCTXNTOWRITEIDLIST => {:type => ::Thrift::Types::LIST, :name => 'srcTxnToWriteIdList', :element => {:type => ::Thrift::Types::STRUCT, :class => ::TxnToWriteId}, :optional => true}
+ SRCTXNTOWRITEIDLIST => {:type => ::Thrift::Types::LIST, :name => 'srcTxnToWriteIdList', :element => {:type => ::Thrift::Types::STRUCT, :class => ::TxnToWriteId}, :optional => true},
+ REALLOCATE => {:type => ::Thrift::Types::BOOL, :name => 'reallocate', :default => false, :optional => true}
}
def struct_fields; FIELDS; end
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 93229b95007..3cfbcba027d 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -115,7 +115,7 @@ import com.google.common.collect.Lists;
public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
private final String CLASS_NAME = HiveMetaStoreClient.class.getName();
-
+
public static final String MANUALLY_INITIATED_COMPACTION = "manual";
public static final String TRUNCATE_SKIP_DATA_DELETION = "truncateSkipDataDeletion";
public static final String RENAME_PARTITION_MAKE_COPY = "renamePartitionMakeCopy";
@@ -1670,7 +1670,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
* @param maxBatchSize
* @throws TException
*/
- private void dropDatabaseCascadePerTable(DropDatabaseRequest req, List<String> tableList, int maxBatchSize)
+ private void dropDatabaseCascadePerTable(DropDatabaseRequest req, List<String> tableList, int maxBatchSize)
throws TException {
String dbNameWithCatalog = prependCatalogToDbName(req.getCatalogName(), req.getName(), conf);
for (Table table : new TableIterable(
@@ -1689,7 +1689,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
context.putToProperties(hive_metastoreConstants.TXN_ID, String.valueOf(req.getTxnId()));
req.setDeleteManagedDir(false);
}
- client.drop_table_with_environment_context(dbNameWithCatalog, table.getTableName(),
+ client.drop_table_with_environment_context(dbNameWithCatalog, table.getTableName(),
req.isDeleteData() && !isSoftDelete, context);
if (hook != null) {
hook.commitDropTable(table, req.isDeleteData());
@@ -1877,7 +1877,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
req.setDeleteData(options.deleteData);
req.setNeedResult(options.returnResults);
req.setIfExists(options.ifExists);
-
+
EnvironmentContext context = null;
if (options.purgeData) {
LOG.info("Dropped partitions will be purged!");
@@ -1892,7 +1892,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
context.putToProperties(hive_metastoreConstants.TXN_ID, options.txnId.toString());
}
req.setEnvironmentContext(context);
-
+
return client.drop_partitions_req(req).getPartitions();
}
@@ -1921,7 +1921,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
}
String catName = Optional.ofNullable(tbl.getCatName()).orElse(getDefaultCatalog(conf));
- dropTable(catName, tbl.getDbName(), tbl.getTableName(), deleteData,
+ dropTable(catName, tbl.getDbName(), tbl.getTableName(), deleteData,
ignoreUnknownTbl, context);
}
@@ -2001,7 +2001,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
truncateTableInternal(getDefaultCatalog(conf),
dbName, tableName, partNames, validWriteIds, writeId, deleteData);
}
-
+
@Override
public void truncateTable(String dbName, String tableName, List<String> partNames,
String validWriteIds, long writeId) throws TException {
@@ -4053,14 +4053,25 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
@Override
public long allocateTableWriteId(long txnId, String dbName, String tableName) throws TException {
- return allocateTableWriteIdsBatch(Collections.singletonList(txnId), dbName, tableName).get(0).getWriteId();
+ return allocateTableWriteId(txnId, dbName, tableName, false);
}
@Override
- public List<TxnToWriteId> allocateTableWriteIdsBatch(List<Long> txnIds, String dbName, String tableName)
- throws TException {
+ public long allocateTableWriteId(long txnId, String dbName, String tableName, boolean shouldRealloc) throws TException {
+ return allocateTableWriteIdsBatch(Collections.singletonList(txnId), dbName, tableName, shouldRealloc).get(0).getWriteId();
+ }
+
+
+ @Override
+ public List<TxnToWriteId> allocateTableWriteIdsBatch(List<Long> txnIds, String dbName, String tableName) throws TException {
+ return allocateTableWriteIdsBatch(txnIds, dbName, tableName, false);
+ }
+
+ private List<TxnToWriteId> allocateTableWriteIdsBatch(List<Long> txnIds, String dbName, String tableName,
+ boolean shouldRealloc) throws TException {
AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(dbName, tableName);
rqst.setTxnIds(txnIds);
+ rqst.setReallocate(shouldRealloc);
return allocateTableWriteIdsBatchIntr(rqst);
}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 9bde71ffa61..576bf99698c 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -3236,6 +3236,16 @@ public interface IMetaStoreClient {
*/
long allocateTableWriteId(long txnId, String dbName, String tableName) throws TException;
+ /**
+ * Allocate a per table write ID and associate it with the given transaction.
+ * @param txnId id of transaction to which the allocated write ID to be associated.
+ * @param dbName name of DB in which the table belongs.
+ * @param tableName table to which the write ID to be allocated
+ * @param reallocate should we reallocate already mapped writeId (if true) or reuse (if false)
+ * @throws TException
+ */
+ long allocateTableWriteId(long txnId, String dbName, String tableName, boolean reallocate) throws TException;
+
/**
* Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
* @param validWriteIdList Snapshot of writeid list when the table/partition is dumped.
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index 4889d3a4ed9..0b01a7a796d 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1166,6 +1166,9 @@ struct AllocateTableWriteIdsRequest {
4: optional string replPolicy,
// The list is assumed to be sorted by both txnids and write ids. The write id list is assumed to be contiguous.
5: optional list<TxnToWriteId> srcTxnToWriteIdList,
+ // If false, reuse previously allocate writeIds for txnIds. If true, remove older txnId to writeIds mappings
+ // and regenerate (this is useful during re-compilation when we need to ensure writeIds are regenerated)
+ 6: optional bool reallocate = false;
}
struct AllocateTableWriteIdsResponse {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 5ab11b1c6bc..ec6870b763e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -2123,6 +2123,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
List<Long> txnIds;
String dbName = rqst.getDbName().toLowerCase();
String tblName = rqst.getTableName().toLowerCase();
+ boolean shouldReallocate = rqst.isReallocate();
try {
Connection dbConn = null;
PreparedStatement pStmt = null;
@@ -2180,33 +2181,48 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
-
- // Traverse the TXN_TO_WRITE_ID to see if any of the input txns already have allocated a
- // write id for the same db.table. If yes, then need to reuse it else have to allocate new one
- // The write id would have been already allocated in case of multi-statement txns where
- // first write on a table will allocate write id and rest of the writes should re-use it.
- prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE")
- .append(" \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND ");
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
- txnIds, "\"T2W_TXNID\"", false, false);
-
- long allocatedTxnsCount = 0;
long writeId;
+ int allocatedTxnsCount = 0;
List<String> params = Arrays.asList(dbName, tblName);
- for (String query : queries) {
- pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params);
- LOG.debug("Going to execute query <" + query.replaceAll("\\?", "{}") + ">",
- quoteString(dbName), quoteString(tblName));
- rs = pStmt.executeQuery();
- while (rs.next()) {
- // If table write ID is already allocated for the given transaction, then just use it
- long txnId = rs.getLong(1);
- writeId = rs.getLong(2);
- txnToWriteIds.add(new TxnToWriteId(txnId, writeId));
- allocatedTxnsCount++;
- LOG.info("Reused already allocated writeID: " + writeId + " for txnId: " + txnId);
+ if (shouldReallocate) {
+ // during query recompilation after lock acquistion, it is important to realloc new writeIds
+ // to ensure writeIds are committed in increasing order.
+ prefix.append("DELETE FROM \"TXN_TO_WRITE_ID\" WHERE")
+ .append(" \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND ");
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+ txnIds, "\"T2W_TXNID\"", false, false);
+ for (String query : queries) {
+ pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params);
+ LOG.debug("Going to execute delete <" + query.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tblName));
+ int numRowsDeleted = pStmt.executeUpdate();
+ LOG.info("Removed {} prior writeIds during reallocation", numRowsDeleted);
+ closeStmt(pStmt);
+ }
+ } else {
+ // Traverse the TXN_TO_WRITE_ID to see if any of the input txns already have allocated a
+ // write id for the same db.table. If yes, then need to reuse it else have to allocate new one
+ // The write id would have been already allocated in case of multi-statement txns where
+ // first write on a table will allocate write id and rest of the writes should re-use it.
+ prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE")
+ .append(" \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND ");
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+ txnIds, "\"T2W_TXNID\"", false, false);
+ for (String query : queries) {
+ pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params);
+ LOG.debug("Going to execute query <" + query.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tblName));
+ rs = pStmt.executeQuery();
+ while (rs.next()) {
+ // If table write ID is already allocated for the given transaction, then just use it
+ long txnId = rs.getLong(1);
+ writeId = rs.getLong(2);
+ txnToWriteIds.add(new TxnToWriteId(txnId, writeId));
+ allocatedTxnsCount++;
+ LOG.info("Reused already allocated writeID: {} for txnId: {}", writeId, txnId);
+ }
+ closeStmt(pStmt);
}
- closeStmt(pStmt);
}
// Batch allocation should always happen atomically. Either write ids for all txns is allocated or none.
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
index a21ee727890..31a3d65a11c 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
@@ -2542,17 +2542,27 @@ public class HiveMetaStoreClientPreCatalog implements IMetaStoreClient, AutoClos
}
client.repl_tbl_writeid_state(rqst);
}
+ @Override
+ public long allocateTableWriteId(long txnId, String dbName, String tableName, boolean shouldRealloc) throws TException {
+ return allocateTableWriteIdsBatch(Collections.singletonList(txnId), dbName, tableName, shouldRealloc).get(0).getWriteId();
+ }
@Override
public long allocateTableWriteId(long txnId, String dbName, String tableName) throws TException {
- return allocateTableWriteIdsBatch(Collections.singletonList(txnId), dbName, tableName).get(0).getWriteId();
+ return allocateTableWriteIdsBatch(Collections.singletonList(txnId), dbName, tableName, false).get(0).getWriteId();
}
@Override
public List<TxnToWriteId> allocateTableWriteIdsBatch(List<Long> txnIds, String dbName, String tableName)
throws TException {
+ return allocateTableWriteIdsBatch(txnIds, dbName, tableName, false);
+ }
+
+ public List<TxnToWriteId> allocateTableWriteIdsBatch(List<Long> txnIds, String dbName, String tableName, boolean shouldRealloc)
+ throws TException {
AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(dbName, tableName);
rqst.setTxnIds(txnIds);
+ rqst.setReallocate(shouldRealloc);
return allocateTableWriteIdsBatchIntr(rqst);
}