You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kr...@apache.org on 2022/08/23 10:32:03 UTC

[hive] branch master updated: HIVE-26472: Allocate new writeIds during re-compilation (John Sherman, reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 5de6faa8c94 HIVE-26472: Allocate new writeIds during re-compilation (John Sherman, reviewed by Denys Kuzmenko)
5de6faa8c94 is described below

commit 5de6faa8c94cabf7075b078072cb778265b7caef
Author: John Sherman <jf...@cloudera.com>
AuthorDate: Tue Aug 23 03:31:52 2022 -0700

    HIVE-26472: Allocate new writeIds during re-compilation (John Sherman, reviewed by Denys Kuzmenko)
---
 ql/src/java/org/apache/hadoop/hive/ql/Driver.java  |  15 +++
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java       |  24 +++--
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java     |   8 ++
 .../hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java |   5 +
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp    |  22 ++++
 .../src/gen/thrift/gen-cpp/hive_metastore_types.h  |  13 ++-
 .../api/AllocateTableWriteIdsRequest.java          | 112 ++++++++++++++++++++-
 .../metastore/AllocateTableWriteIdsRequest.php     |  24 +++++
 .../src/gen/thrift/gen-py/hive_metastore/ttypes.py |  14 ++-
 .../src/gen/thrift/gen-rb/hive_metastore_types.rb  |   4 +-
 .../hadoop/hive/metastore/HiveMetaStoreClient.java |  31 ++++--
 .../hadoop/hive/metastore/IMetaStoreClient.java    |  10 ++
 .../src/main/thrift/hive_metastore.thrift          |   3 +
 .../hadoop/hive/metastore/txn/TxnHandler.java      |  64 +++++++-----
 .../metastore/HiveMetaStoreClientPreCatalog.java   |  12 ++-
 15 files changed, 312 insertions(+), 49 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 3565763038a..d494f409ad7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -262,6 +262,21 @@ public class Driver implements IDriver {
             String userFromUGI = DriverUtils.getUserFromUGI(driverContext);
             driverContext.getTxnManager().openTxn(context, userFromUGI, driverContext.getTxnType());
             lockAndRespond();
+          } else {
+            // We need to clear the possibly cached writeIds for the prior transaction, so new writeIds
+            // are allocated since writeIds need to be committed in increasing order. It helps in cases
+            // like:
+            // txnId   writeId
+            // 10      71  <--- commit first
+            // 11      69
+            // 12      70
+            // in which the transaction is not out of date, but the writeId would not be increasing.
+            // This would be a problem in an UPDATE, since it would end up generating delete
+            // deltas for a future writeId - which in turn causes scans to not think they are deleted.
+            // The scan basically does last writer wins for a given row which is determined by
+            // max(committingWriteId) for a given ROW__ID(originalWriteId, bucketId, rowId). So the
+            // data add ends up being > than the data delete.
+            driverContext.getTxnManager().clearCaches();
           }
           driverContext.setRetrial(true);
           driverContext.getBackupContext().addSubContext(context);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 40ff171db22..de8b3a45df4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -110,6 +110,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
    * The local cache of table write IDs allocated/created by the current transaction
    */
   private Map<String, Long> tableWriteIds = new HashMap<>();
+  private boolean shouldReallocateWriteIds = false;
 
   /**
    * assigns a unique monotonically increasing ID to each statement
@@ -229,6 +230,13 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     return openTxn(ctx, user, txnType, 0);
   }
 
+  @Override
+  public void clearCaches() {
+    LOG.info("Clearing writeId cache for {}", JavaUtils.txnIdToString(txnId));
+    tableWriteIds.clear();
+    shouldReallocateWriteIds = true;
+  }
+
   @VisibleForTesting
   long openTxn(Context ctx, String user, TxnType txnType, long delay) throws LockException {
     /*Q: why don't we lock the snapshot here???  Instead of having client make an explicit call
@@ -251,6 +259,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       stmtId = 0;
       numStatements = 0;
       tableWriteIds.clear();
+      shouldReallocateWriteIds = false;
       isExplicitTransaction = false;
       startTransactionCount = 0;
       this.queryId = ctx.getConf().get(HiveConf.ConfVars.HIVEQUERYID.varname);
@@ -491,6 +500,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     stmtId = -1;
     numStatements = 0;
     tableWriteIds.clear();
+    shouldReallocateWriteIds = false;
     queryId = null;
     replPolicy = null;
   }
@@ -933,10 +943,11 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     assert isTxnOpen();
     return stmtId;
   }
+
   @Override
   public long getTableWriteId(String dbName, String tableName) throws LockException {
     assert isTxnOpen();
-    return getTableWriteId(dbName, tableName, true);
+    return getTableWriteId(dbName, tableName, true,  shouldReallocateWriteIds);
   }
 
   @Override
@@ -946,7 +957,7 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     // to return 0 if the dbName:tableName's writeId is yet allocated.
     // This happens when the current context is before
     // Driver.acquireLocks() is called.
-    return getTableWriteId(dbName, tableName, false);
+    return getTableWriteId(dbName, tableName, false, false);
   }
 
   @Override
@@ -959,8 +970,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     }
   }
 
-  private long getTableWriteId(
-      String dbName, String tableName, boolean allocateIfNotYet) throws LockException {
+  private long getTableWriteId(String dbName, String tableName, boolean allocateIfNotYet,
+		  boolean shouldReallocate) throws LockException {
     String fullTableName = AcidUtils.getFullTableName(dbName, tableName);
     if (tableWriteIds.containsKey(fullTableName)) {
       return tableWriteIds.get(fullTableName);
@@ -968,8 +979,9 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       return 0;
     }
     try {
-      long writeId = getMS().allocateTableWriteId(txnId, dbName, tableName);
-      LOG.debug("Allocated write ID {} for {}.{}", writeId, dbName, tableName);
+      long writeId = getMS().allocateTableWriteId(txnId, dbName, tableName, shouldReallocate);
+      LOG.info("Allocated write ID {} for {}.{} and {} (shouldReallocate: {}) ", writeId, dbName,
+          tableName, JavaUtils.txnIdToString(txnId), shouldReallocate);
       tableWriteIds.put(fullTableName, writeId);
       return writeId;
     } catch (TException e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 24deac59281..b114414e9c7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -347,6 +347,14 @@ public interface HiveTxnManager {
   // Can be used by operation to set the stmt id when allocation is done somewhere else.
   int getCurrentStmtId();
 
+  /**
+   * Reset locally cached information.
+   * This is called before re-compilation after aquiring lock if the transaction is not
+   * outdated. The intent is to clear any cached information such as WriteIds (but not
+   * reseting/rolling back the overall transaction).
+   */
+   void clearCaches();
+
   /**
    * Acquire the materialization rebuild lock for a given view. We need to specify the fully
    * qualified name of the materialized view and the open transaction ID so we can identify
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
index 9d5f9aa4107..9897795db9c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManagerImpl.java
@@ -71,6 +71,11 @@ abstract class HiveTxnManagerImpl implements HiveTxnManager, Configurable {
     destruct();
   }
 
+  @Override
+  public void clearCaches() {
+    // no op - implementations should override as needed
+  }
+
   @Override
   public void acquireLocks(QueryPlan plan, Context ctx, String username, DriverState driverState) throws LockException {
     acquireLocks(plan, ctx, username);
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index e999500532a..c031ccb561b 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -23849,6 +23849,11 @@ void AllocateTableWriteIdsRequest::__set_srcTxnToWriteIdList(const std::vector<T
   this->srcTxnToWriteIdList = val;
 __isset.srcTxnToWriteIdList = true;
 }
+
+void AllocateTableWriteIdsRequest::__set_reallocate(const bool val) {
+  this->reallocate = val;
+__isset.reallocate = true;
+}
 std::ostream& operator<<(std::ostream& out, const AllocateTableWriteIdsRequest& obj)
 {
   obj.printTo(out);
@@ -23943,6 +23948,14 @@ uint32_t AllocateTableWriteIdsRequest::read(::apache::thrift::protocol::TProtoco
           xfer += iprot->skip(ftype);
         }
         break;
+      case 6:
+        if (ftype == ::apache::thrift::protocol::T_BOOL) {
+          xfer += iprot->readBool(this->reallocate);
+          this->__isset.reallocate = true;
+        } else {
+          xfer += iprot->skip(ftype);
+        }
+        break;
       default:
         xfer += iprot->skip(ftype);
         break;
@@ -24003,6 +24016,11 @@ uint32_t AllocateTableWriteIdsRequest::write(::apache::thrift::protocol::TProtoc
     }
     xfer += oprot->writeFieldEnd();
   }
+  if (this->__isset.reallocate) {
+    xfer += oprot->writeFieldBegin("reallocate", ::apache::thrift::protocol::T_BOOL, 6);
+    xfer += oprot->writeBool(this->reallocate);
+    xfer += oprot->writeFieldEnd();
+  }
   xfer += oprot->writeFieldStop();
   xfer += oprot->writeStructEnd();
   return xfer;
@@ -24015,6 +24033,7 @@ void swap(AllocateTableWriteIdsRequest &a, AllocateTableWriteIdsRequest &b) {
   swap(a.txnIds, b.txnIds);
   swap(a.replPolicy, b.replPolicy);
   swap(a.srcTxnToWriteIdList, b.srcTxnToWriteIdList);
+  swap(a.reallocate, b.reallocate);
   swap(a.__isset, b.__isset);
 }
 
@@ -24024,6 +24043,7 @@ AllocateTableWriteIdsRequest::AllocateTableWriteIdsRequest(const AllocateTableWr
   txnIds = other897.txnIds;
   replPolicy = other897.replPolicy;
   srcTxnToWriteIdList = other897.srcTxnToWriteIdList;
+  reallocate = other897.reallocate;
   __isset = other897.__isset;
 }
 AllocateTableWriteIdsRequest& AllocateTableWriteIdsRequest::operator=(const AllocateTableWriteIdsRequest& other898) {
@@ -24032,6 +24052,7 @@ AllocateTableWriteIdsRequest& AllocateTableWriteIdsRequest::operator=(const Allo
   txnIds = other898.txnIds;
   replPolicy = other898.replPolicy;
   srcTxnToWriteIdList = other898.srcTxnToWriteIdList;
+  reallocate = other898.reallocate;
   __isset = other898.__isset;
   return *this;
 }
@@ -24043,6 +24064,7 @@ void AllocateTableWriteIdsRequest::printTo(std::ostream& out) const {
   out << ", " << "txnIds="; (__isset.txnIds ? (out << to_string(txnIds)) : (out << "<null>"));
   out << ", " << "replPolicy="; (__isset.replPolicy ? (out << to_string(replPolicy)) : (out << "<null>"));
   out << ", " << "srcTxnToWriteIdList="; (__isset.srcTxnToWriteIdList ? (out << to_string(srcTxnToWriteIdList)) : (out << "<null>"));
+  out << ", " << "reallocate="; (__isset.reallocate ? (out << to_string(reallocate)) : (out << "<null>"));
   out << ")";
 }
 
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
index e5b1b3512be..152b99a4e0b 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h
@@ -9489,10 +9489,11 @@ void swap(TxnToWriteId &a, TxnToWriteId &b);
 std::ostream& operator<<(std::ostream& out, const TxnToWriteId& obj);
 
 typedef struct _AllocateTableWriteIdsRequest__isset {
-  _AllocateTableWriteIdsRequest__isset() : txnIds(false), replPolicy(false), srcTxnToWriteIdList(false) {}
+  _AllocateTableWriteIdsRequest__isset() : txnIds(false), replPolicy(false), srcTxnToWriteIdList(false), reallocate(true) {}
   bool txnIds :1;
   bool replPolicy :1;
   bool srcTxnToWriteIdList :1;
+  bool reallocate :1;
 } _AllocateTableWriteIdsRequest__isset;
 
 class AllocateTableWriteIdsRequest : public virtual ::apache::thrift::TBase {
@@ -9503,7 +9504,8 @@ class AllocateTableWriteIdsRequest : public virtual ::apache::thrift::TBase {
   AllocateTableWriteIdsRequest() noexcept
                                : dbName(),
                                  tableName(),
-                                 replPolicy() {
+                                 replPolicy(),
+                                 reallocate(false) {
   }
 
   virtual ~AllocateTableWriteIdsRequest() noexcept;
@@ -9512,6 +9514,7 @@ class AllocateTableWriteIdsRequest : public virtual ::apache::thrift::TBase {
   std::vector<int64_t>  txnIds;
   std::string replPolicy;
   std::vector<TxnToWriteId>  srcTxnToWriteIdList;
+  bool reallocate;
 
   _AllocateTableWriteIdsRequest__isset __isset;
 
@@ -9525,6 +9528,8 @@ class AllocateTableWriteIdsRequest : public virtual ::apache::thrift::TBase {
 
   void __set_srcTxnToWriteIdList(const std::vector<TxnToWriteId> & val);
 
+  void __set_reallocate(const bool val);
+
   bool operator == (const AllocateTableWriteIdsRequest & rhs) const
   {
     if (!(dbName == rhs.dbName))
@@ -9543,6 +9548,10 @@ class AllocateTableWriteIdsRequest : public virtual ::apache::thrift::TBase {
       return false;
     else if (__isset.srcTxnToWriteIdList && !(srcTxnToWriteIdList == rhs.srcTxnToWriteIdList))
       return false;
+    if (__isset.reallocate != rhs.__isset.reallocate)
+      return false;
+    else if (__isset.reallocate && !(reallocate == rhs.reallocate))
+      return false;
     return true;
   }
   bool operator != (const AllocateTableWriteIdsRequest &rhs) const {
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AllocateTableWriteIdsRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AllocateTableWriteIdsRequest.java
index 58b31cce26b..b94c793c7ff 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AllocateTableWriteIdsRequest.java
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/AllocateTableWriteIdsRequest.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.metastore.api;
   private static final org.apache.thrift.protocol.TField TXN_IDS_FIELD_DESC = new org.apache.thrift.protocol.TField("txnIds", org.apache.thrift.protocol.TType.LIST, (short)3);
   private static final org.apache.thrift.protocol.TField REPL_POLICY_FIELD_DESC = new org.apache.thrift.protocol.TField("replPolicy", org.apache.thrift.protocol.TType.STRING, (short)4);
   private static final org.apache.thrift.protocol.TField SRC_TXN_TO_WRITE_ID_LIST_FIELD_DESC = new org.apache.thrift.protocol.TField("srcTxnToWriteIdList", org.apache.thrift.protocol.TType.LIST, (short)5);
+  private static final org.apache.thrift.protocol.TField REALLOCATE_FIELD_DESC = new org.apache.thrift.protocol.TField("reallocate", org.apache.thrift.protocol.TType.BOOL, (short)6);
 
   private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new AllocateTableWriteIdsRequestStandardSchemeFactory();
   private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new AllocateTableWriteIdsRequestTupleSchemeFactory();
@@ -25,6 +26,7 @@ package org.apache.hadoop.hive.metastore.api;
   private @org.apache.thrift.annotation.Nullable java.util.List<java.lang.Long> txnIds; // optional
   private @org.apache.thrift.annotation.Nullable java.lang.String replPolicy; // optional
   private @org.apache.thrift.annotation.Nullable java.util.List<TxnToWriteId> srcTxnToWriteIdList; // optional
+  private boolean reallocate; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -32,7 +34,8 @@ package org.apache.hadoop.hive.metastore.api;
     TABLE_NAME((short)2, "tableName"),
     TXN_IDS((short)3, "txnIds"),
     REPL_POLICY((short)4, "replPolicy"),
-    SRC_TXN_TO_WRITE_ID_LIST((short)5, "srcTxnToWriteIdList");
+    SRC_TXN_TO_WRITE_ID_LIST((short)5, "srcTxnToWriteIdList"),
+    REALLOCATE((short)6, "reallocate");
 
     private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
 
@@ -58,6 +61,8 @@ package org.apache.hadoop.hive.metastore.api;
           return REPL_POLICY;
         case 5: // SRC_TXN_TO_WRITE_ID_LIST
           return SRC_TXN_TO_WRITE_ID_LIST;
+        case 6: // REALLOCATE
+          return REALLOCATE;
         default:
           return null;
       }
@@ -99,7 +104,9 @@ package org.apache.hadoop.hive.metastore.api;
   }
 
   // isset id assignments
-  private static final _Fields optionals[] = {_Fields.TXN_IDS,_Fields.REPL_POLICY,_Fields.SRC_TXN_TO_WRITE_ID_LIST};
+  private static final int __REALLOCATE_ISSET_ID = 0;
+  private byte __isset_bitfield = 0;
+  private static final _Fields optionals[] = {_Fields.TXN_IDS,_Fields.REPL_POLICY,_Fields.SRC_TXN_TO_WRITE_ID_LIST,_Fields.REALLOCATE};
   public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -115,11 +122,15 @@ package org.apache.hadoop.hive.metastore.api;
     tmpMap.put(_Fields.SRC_TXN_TO_WRITE_ID_LIST, new org.apache.thrift.meta_data.FieldMetaData("srcTxnToWriteIdList", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
             new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TxnToWriteId.class))));
+    tmpMap.put(_Fields.REALLOCATE, new org.apache.thrift.meta_data.FieldMetaData("reallocate", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
     metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(AllocateTableWriteIdsRequest.class, metaDataMap);
   }
 
   public AllocateTableWriteIdsRequest() {
+    this.reallocate = false;
+
   }
 
   public AllocateTableWriteIdsRequest(
@@ -135,6 +146,7 @@ package org.apache.hadoop.hive.metastore.api;
    * Performs a deep copy on <i>other</i>.
    */
   public AllocateTableWriteIdsRequest(AllocateTableWriteIdsRequest other) {
+    __isset_bitfield = other.__isset_bitfield;
     if (other.isSetDbName()) {
       this.dbName = other.dbName;
     }
@@ -155,6 +167,7 @@ package org.apache.hadoop.hive.metastore.api;
       }
       this.srcTxnToWriteIdList = __this__srcTxnToWriteIdList;
     }
+    this.reallocate = other.reallocate;
   }
 
   public AllocateTableWriteIdsRequest deepCopy() {
@@ -168,6 +181,8 @@ package org.apache.hadoop.hive.metastore.api;
     this.txnIds = null;
     this.replPolicy = null;
     this.srcTxnToWriteIdList = null;
+    this.reallocate = false;
+
   }
 
   @org.apache.thrift.annotation.Nullable
@@ -322,6 +337,28 @@ package org.apache.hadoop.hive.metastore.api;
     }
   }
 
+  public boolean isReallocate() {
+    return this.reallocate;
+  }
+
+  public void setReallocate(boolean reallocate) {
+    this.reallocate = reallocate;
+    setReallocateIsSet(true);
+  }
+
+  public void unsetReallocate() {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __REALLOCATE_ISSET_ID);
+  }
+
+  /** Returns true if field reallocate is set (has been assigned a value) and false otherwise */
+  public boolean isSetReallocate() {
+    return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __REALLOCATE_ISSET_ID);
+  }
+
+  public void setReallocateIsSet(boolean value) {
+    __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __REALLOCATE_ISSET_ID, value);
+  }
+
   public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
     switch (field) {
     case DB_NAME:
@@ -364,6 +401,14 @@ package org.apache.hadoop.hive.metastore.api;
       }
       break;
 
+    case REALLOCATE:
+      if (value == null) {
+        unsetReallocate();
+      } else {
+        setReallocate((java.lang.Boolean)value);
+      }
+      break;
+
     }
   }
 
@@ -385,6 +430,9 @@ package org.apache.hadoop.hive.metastore.api;
     case SRC_TXN_TO_WRITE_ID_LIST:
       return getSrcTxnToWriteIdList();
 
+    case REALLOCATE:
+      return isReallocate();
+
     }
     throw new java.lang.IllegalStateException();
   }
@@ -406,6 +454,8 @@ package org.apache.hadoop.hive.metastore.api;
       return isSetReplPolicy();
     case SRC_TXN_TO_WRITE_ID_LIST:
       return isSetSrcTxnToWriteIdList();
+    case REALLOCATE:
+      return isSetReallocate();
     }
     throw new java.lang.IllegalStateException();
   }
@@ -468,6 +518,15 @@ package org.apache.hadoop.hive.metastore.api;
         return false;
     }
 
+    boolean this_present_reallocate = true && this.isSetReallocate();
+    boolean that_present_reallocate = true && that.isSetReallocate();
+    if (this_present_reallocate || that_present_reallocate) {
+      if (!(this_present_reallocate && that_present_reallocate))
+        return false;
+      if (this.reallocate != that.reallocate)
+        return false;
+    }
+
     return true;
   }
 
@@ -495,6 +554,10 @@ package org.apache.hadoop.hive.metastore.api;
     if (isSetSrcTxnToWriteIdList())
       hashCode = hashCode * 8191 + srcTxnToWriteIdList.hashCode();
 
+    hashCode = hashCode * 8191 + ((isSetReallocate()) ? 131071 : 524287);
+    if (isSetReallocate())
+      hashCode = hashCode * 8191 + ((reallocate) ? 131071 : 524287);
+
     return hashCode;
   }
 
@@ -556,6 +619,16 @@ package org.apache.hadoop.hive.metastore.api;
         return lastComparison;
       }
     }
+    lastComparison = java.lang.Boolean.compare(isSetReallocate(), other.isSetReallocate());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetReallocate()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.reallocate, other.reallocate);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -622,6 +695,12 @@ package org.apache.hadoop.hive.metastore.api;
       }
       first = false;
     }
+    if (isSetReallocate()) {
+      if (!first) sb.append(", ");
+      sb.append("reallocate:");
+      sb.append(this.reallocate);
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -649,6 +728,8 @@ package org.apache.hadoop.hive.metastore.api;
 
   private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException {
     try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
       read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
     } catch (org.apache.thrift.TException te) {
       throw new java.io.IOException(te);
@@ -734,6 +815,14 @@ package org.apache.hadoop.hive.metastore.api;
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 6: // REALLOCATE
+            if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+              struct.reallocate = iprot.readBool();
+              struct.setReallocateIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -792,6 +881,11 @@ package org.apache.hadoop.hive.metastore.api;
           oprot.writeFieldEnd();
         }
       }
+      if (struct.isSetReallocate()) {
+        oprot.writeFieldBegin(REALLOCATE_FIELD_DESC);
+        oprot.writeBool(struct.reallocate);
+        oprot.writeFieldEnd();
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -821,7 +915,10 @@ package org.apache.hadoop.hive.metastore.api;
       if (struct.isSetSrcTxnToWriteIdList()) {
         optionals.set(2);
       }
-      oprot.writeBitSet(optionals, 3);
+      if (struct.isSetReallocate()) {
+        optionals.set(3);
+      }
+      oprot.writeBitSet(optionals, 4);
       if (struct.isSetTxnIds()) {
         {
           oprot.writeI32(struct.txnIds.size());
@@ -843,6 +940,9 @@ package org.apache.hadoop.hive.metastore.api;
           }
         }
       }
+      if (struct.isSetReallocate()) {
+        oprot.writeBool(struct.reallocate);
+      }
     }
 
     @Override
@@ -852,7 +952,7 @@ package org.apache.hadoop.hive.metastore.api;
       struct.setDbNameIsSet(true);
       struct.tableName = iprot.readString();
       struct.setTableNameIsSet(true);
-      java.util.BitSet incoming = iprot.readBitSet(3);
+      java.util.BitSet incoming = iprot.readBitSet(4);
       if (incoming.get(0)) {
         {
           org.apache.thrift.protocol.TList _list808 = iprot.readListBegin(org.apache.thrift.protocol.TType.I64);
@@ -884,6 +984,10 @@ package org.apache.hadoop.hive.metastore.api;
         }
         struct.setSrcTxnToWriteIdListIsSet(true);
       }
+      if (incoming.get(3)) {
+        struct.reallocate = iprot.readBool();
+        struct.setReallocateIsSet(true);
+      }
     }
   }
 
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/AllocateTableWriteIdsRequest.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/AllocateTableWriteIdsRequest.php
index af402a9afff..1d43e8d3f9c 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/AllocateTableWriteIdsRequest.php
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/AllocateTableWriteIdsRequest.php
@@ -55,6 +55,11 @@ class AllocateTableWriteIdsRequest
                 'class' => '\metastore\TxnToWriteId',
                 ),
         ),
+        6 => array(
+            'var' => 'reallocate',
+            'isRequired' => false,
+            'type' => TType::BOOL,
+        ),
     );
 
     /**
@@ -77,6 +82,10 @@ class AllocateTableWriteIdsRequest
      * @var \metastore\TxnToWriteId[]
      */
     public $srcTxnToWriteIdList = null;
+    /**
+     * @var bool
+     */
+    public $reallocate = false;
 
     public function __construct($vals = null)
     {
@@ -96,6 +105,9 @@ class AllocateTableWriteIdsRequest
             if (isset($vals['srcTxnToWriteIdList'])) {
                 $this->srcTxnToWriteIdList = $vals['srcTxnToWriteIdList'];
             }
+            if (isset($vals['reallocate'])) {
+                $this->reallocate = $vals['reallocate'];
+            }
         }
     }
 
@@ -172,6 +184,13 @@ class AllocateTableWriteIdsRequest
                         $xfer += $input->skip($ftype);
                     }
                     break;
+                case 6:
+                    if ($ftype == TType::BOOL) {
+                        $xfer += $input->readBool($this->reallocate);
+                    } else {
+                        $xfer += $input->skip($ftype);
+                    }
+                    break;
                 default:
                     $xfer += $input->skip($ftype);
                     break;
@@ -225,6 +244,11 @@ class AllocateTableWriteIdsRequest
             $output->writeListEnd();
             $xfer += $output->writeFieldEnd();
         }
+        if ($this->reallocate !== null) {
+            $xfer += $output->writeFieldBegin('reallocate', TType::BOOL, 6);
+            $xfer += $output->writeBool($this->reallocate);
+            $xfer += $output->writeFieldEnd();
+        }
         $xfer += $output->writeFieldStop();
         $xfer += $output->writeStructEnd();
         return $xfer;
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 80b02ec1437..450de49e2e8 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -13651,16 +13651,18 @@ class AllocateTableWriteIdsRequest(object):
      - txnIds
      - replPolicy
      - srcTxnToWriteIdList
+     - reallocate
 
     """
 
 
-    def __init__(self, dbName=None, tableName=None, txnIds=None, replPolicy=None, srcTxnToWriteIdList=None,):
+    def __init__(self, dbName=None, tableName=None, txnIds=None, replPolicy=None, srcTxnToWriteIdList=None, reallocate=False,):
         self.dbName = dbName
         self.tableName = tableName
         self.txnIds = txnIds
         self.replPolicy = replPolicy
         self.srcTxnToWriteIdList = srcTxnToWriteIdList
+        self.reallocate = reallocate
 
     def read(self, iprot):
         if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None:
@@ -13707,6 +13709,11 @@ class AllocateTableWriteIdsRequest(object):
                     iprot.readListEnd()
                 else:
                     iprot.skip(ftype)
+            elif fid == 6:
+                if ftype == TType.BOOL:
+                    self.reallocate = iprot.readBool()
+                else:
+                    iprot.skip(ftype)
             else:
                 iprot.skip(ftype)
             iprot.readFieldEnd()
@@ -13743,6 +13750,10 @@ class AllocateTableWriteIdsRequest(object):
                 iter715.write(oprot)
             oprot.writeListEnd()
             oprot.writeFieldEnd()
+        if self.reallocate is not None:
+            oprot.writeFieldBegin('reallocate', TType.BOOL, 6)
+            oprot.writeBool(self.reallocate)
+            oprot.writeFieldEnd()
         oprot.writeFieldStop()
         oprot.writeStructEnd()
 
@@ -30797,6 +30808,7 @@ AllocateTableWriteIdsRequest.thrift_spec = (
     (3, TType.LIST, 'txnIds', (TType.I64, None, False), None, ),  # 3
     (4, TType.STRING, 'replPolicy', 'UTF8', None, ),  # 4
     (5, TType.LIST, 'srcTxnToWriteIdList', (TType.STRUCT, [TxnToWriteId, None], False), None, ),  # 5
+    (6, TType.BOOL, 'reallocate', None, False, ),  # 6
 )
 all_structs.append(AllocateTableWriteIdsResponse)
 AllocateTableWriteIdsResponse.thrift_spec = (
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 065e2642dc4..929b8cd7147 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -4034,13 +4034,15 @@ class AllocateTableWriteIdsRequest
   TXNIDS = 3
   REPLPOLICY = 4
   SRCTXNTOWRITEIDLIST = 5
+  REALLOCATE = 6
 
   FIELDS = {
     DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
     TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'},
     TXNIDS => {:type => ::Thrift::Types::LIST, :name => 'txnIds', :element => {:type => ::Thrift::Types::I64}, :optional => true},
     REPLPOLICY => {:type => ::Thrift::Types::STRING, :name => 'replPolicy', :optional => true},
-    SRCTXNTOWRITEIDLIST => {:type => ::Thrift::Types::LIST, :name => 'srcTxnToWriteIdList', :element => {:type => ::Thrift::Types::STRUCT, :class => ::TxnToWriteId}, :optional => true}
+    SRCTXNTOWRITEIDLIST => {:type => ::Thrift::Types::LIST, :name => 'srcTxnToWriteIdList', :element => {:type => ::Thrift::Types::STRUCT, :class => ::TxnToWriteId}, :optional => true},
+    REALLOCATE => {:type => ::Thrift::Types::BOOL, :name => 'reallocate', :default => false, :optional => true}
   }
 
   def struct_fields; FIELDS; end
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 93229b95007..3cfbcba027d 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -115,7 +115,7 @@ import com.google.common.collect.Lists;
 public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
 
   private final String CLASS_NAME = HiveMetaStoreClient.class.getName();
-  
+
   public static final String MANUALLY_INITIATED_COMPACTION = "manual";
   public static final String TRUNCATE_SKIP_DATA_DELETION = "truncateSkipDataDeletion";
   public static final String RENAME_PARTITION_MAKE_COPY = "renamePartitionMakeCopy";
@@ -1670,7 +1670,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
    * @param maxBatchSize
    * @throws TException
    */
-  private void dropDatabaseCascadePerTable(DropDatabaseRequest req, List<String> tableList, int maxBatchSize) 
+  private void dropDatabaseCascadePerTable(DropDatabaseRequest req, List<String> tableList, int maxBatchSize)
       throws TException {
     String dbNameWithCatalog = prependCatalogToDbName(req.getCatalogName(), req.getName(), conf);
     for (Table table : new TableIterable(
@@ -1689,7 +1689,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
           context.putToProperties(hive_metastoreConstants.TXN_ID, String.valueOf(req.getTxnId()));
           req.setDeleteManagedDir(false);
         }
-        client.drop_table_with_environment_context(dbNameWithCatalog, table.getTableName(), 
+        client.drop_table_with_environment_context(dbNameWithCatalog, table.getTableName(),
             req.isDeleteData() && !isSoftDelete, context);
         if (hook != null) {
           hook.commitDropTable(table, req.isDeleteData());
@@ -1877,7 +1877,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     req.setDeleteData(options.deleteData);
     req.setNeedResult(options.returnResults);
     req.setIfExists(options.ifExists);
-    
+
     EnvironmentContext context = null;
     if (options.purgeData) {
       LOG.info("Dropped partitions will be purged!");
@@ -1892,7 +1892,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
       context.putToProperties(hive_metastoreConstants.TXN_ID, options.txnId.toString());
     }
     req.setEnvironmentContext(context);
-    
+
     return client.drop_partitions_req(req).getPartitions();
   }
 
@@ -1921,7 +1921,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     }
     String catName = Optional.ofNullable(tbl.getCatName()).orElse(getDefaultCatalog(conf));
 
-    dropTable(catName, tbl.getDbName(), tbl.getTableName(), deleteData, 
+    dropTable(catName, tbl.getDbName(), tbl.getTableName(), deleteData,
         ignoreUnknownTbl, context);
   }
 
@@ -2001,7 +2001,7 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
     truncateTableInternal(getDefaultCatalog(conf),
         dbName, tableName, partNames, validWriteIds, writeId, deleteData);
   }
-  
+
   @Override
   public void truncateTable(String dbName, String tableName, List<String> partNames,
       String validWriteIds, long writeId) throws TException {
@@ -4053,14 +4053,25 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
 
   @Override
   public long allocateTableWriteId(long txnId, String dbName, String tableName) throws TException {
-    return allocateTableWriteIdsBatch(Collections.singletonList(txnId), dbName, tableName).get(0).getWriteId();
+    return allocateTableWriteId(txnId, dbName, tableName, false);
   }
 
   @Override
-  public List<TxnToWriteId> allocateTableWriteIdsBatch(List<Long> txnIds, String dbName, String tableName)
-          throws TException {
+  public long allocateTableWriteId(long txnId, String dbName, String tableName, boolean shouldRealloc) throws TException {
+    return allocateTableWriteIdsBatch(Collections.singletonList(txnId), dbName, tableName, shouldRealloc).get(0).getWriteId();
+  }
+
+
+  @Override
+  public List<TxnToWriteId> allocateTableWriteIdsBatch(List<Long> txnIds, String dbName, String tableName) throws TException {
+    return allocateTableWriteIdsBatch(txnIds, dbName, tableName, false);
+  }
+
+  private List<TxnToWriteId> allocateTableWriteIdsBatch(List<Long> txnIds, String dbName, String tableName,
+      boolean shouldRealloc) throws TException {
     AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(dbName, tableName);
     rqst.setTxnIds(txnIds);
+    rqst.setReallocate(shouldRealloc);
     return allocateTableWriteIdsBatchIntr(rqst);
   }
 
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 9bde71ffa61..576bf99698c 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -3236,6 +3236,16 @@ public interface IMetaStoreClient {
    */
   long allocateTableWriteId(long txnId, String dbName, String tableName) throws TException;
 
+  /**
+   * Allocate a per table write ID and associate it with the given transaction.
+   * @param txnId id of transaction to which the allocated write ID to be associated.
+   * @param dbName name of DB in which the table belongs.
+   * @param tableName table to which the write ID to be allocated
+   * @param reallocate should we reallocate already mapped writeId (if true) or reuse (if false)
+   * @throws TException
+   */
+  long allocateTableWriteId(long txnId, String dbName, String tableName, boolean reallocate) throws TException;
+
   /**
    * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
    * @param validWriteIdList Snapshot of writeid list when the table/partition is dumped.
diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
index 4889d3a4ed9..0b01a7a796d 100644
--- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift
@@ -1166,6 +1166,9 @@ struct AllocateTableWriteIdsRequest {
     4: optional string replPolicy,
     // The list is assumed to be sorted by both txnids and write ids. The write id list is assumed to be contiguous.
     5: optional list<TxnToWriteId> srcTxnToWriteIdList,
+    // If false, reuse previously allocate writeIds for txnIds. If true, remove older txnId to writeIds mappings
+    // and regenerate (this is useful during re-compilation when we need to ensure writeIds are regenerated)
+    6: optional bool reallocate = false;
 }
 
 struct AllocateTableWriteIdsResponse {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 5ab11b1c6bc..ec6870b763e 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -2123,6 +2123,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     List<Long> txnIds;
     String dbName = rqst.getDbName().toLowerCase();
     String tblName = rqst.getTableName().toLowerCase();
+    boolean shouldReallocate = rqst.isReallocate();
     try {
       Connection dbConn = null;
       PreparedStatement pStmt = null;
@@ -2180,33 +2181,48 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         List<String> queries = new ArrayList<>();
         StringBuilder prefix = new StringBuilder();
         StringBuilder suffix = new StringBuilder();
-
-        // Traverse the TXN_TO_WRITE_ID to see if any of the input txns already have allocated a
-        // write id for the same db.table. If yes, then need to reuse it else have to allocate new one
-        // The write id would have been already allocated in case of multi-statement txns where
-        // first write on a table will allocate write id and rest of the writes should re-use it.
-        prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE")
-              .append(" \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND ");
-        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
-                txnIds, "\"T2W_TXNID\"", false, false);
-
-        long allocatedTxnsCount = 0;
         long writeId;
+        int allocatedTxnsCount = 0;
         List<String> params = Arrays.asList(dbName, tblName);
-        for (String query : queries) {
-          pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params);
-          LOG.debug("Going to execute query <" + query.replaceAll("\\?", "{}") + ">",
-                  quoteString(dbName), quoteString(tblName));
-          rs = pStmt.executeQuery();
-          while (rs.next()) {
-            // If table write ID is already allocated for the given transaction, then just use it
-            long txnId = rs.getLong(1);
-            writeId = rs.getLong(2);
-            txnToWriteIds.add(new TxnToWriteId(txnId, writeId));
-            allocatedTxnsCount++;
-            LOG.info("Reused already allocated writeID: " + writeId + " for txnId: " + txnId);
+        if (shouldReallocate) {
+          // during query recompilation after lock acquistion, it is important to realloc new writeIds
+          // to ensure writeIds are committed in increasing order.
+          prefix.append("DELETE FROM \"TXN_TO_WRITE_ID\" WHERE")
+                .append(" \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND ");
+          TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+              txnIds, "\"T2W_TXNID\"", false, false);
+          for (String query : queries) {
+            pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params);
+            LOG.debug("Going to execute delete <" + query.replaceAll("\\?", "{}") + ">",
+                quoteString(dbName), quoteString(tblName));
+            int numRowsDeleted = pStmt.executeUpdate();
+            LOG.info("Removed {} prior writeIds during reallocation", numRowsDeleted);
+            closeStmt(pStmt);
+          }
+        } else {
+          // Traverse the TXN_TO_WRITE_ID to see if any of the input txns already have allocated a
+          // write id for the same db.table. If yes, then need to reuse it else have to allocate new one
+          // The write id would have been already allocated in case of multi-statement txns where
+          // first write on a table will allocate write id and rest of the writes should re-use it.
+          prefix.append("SELECT \"T2W_TXNID\", \"T2W_WRITEID\" FROM \"TXN_TO_WRITE_ID\" WHERE")
+                .append(" \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND ");
+          TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
+              txnIds, "\"T2W_TXNID\"", false, false);
+          for (String query : queries) {
+            pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params);
+            LOG.debug("Going to execute query <" + query.replaceAll("\\?", "{}") + ">",
+                quoteString(dbName), quoteString(tblName));
+            rs = pStmt.executeQuery();
+            while (rs.next()) {
+              // If table write ID is already allocated for the given transaction, then just use it
+              long txnId = rs.getLong(1);
+              writeId = rs.getLong(2);
+              txnToWriteIds.add(new TxnToWriteId(txnId, writeId));
+              allocatedTxnsCount++;
+              LOG.info("Reused already allocated writeID: {} for txnId: {}", writeId, txnId);
+            }
+            closeStmt(pStmt);
           }
-          closeStmt(pStmt);
         }
 
         // Batch allocation should always happen atomically. Either write ids for all txns is allocated or none.
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
index a21ee727890..31a3d65a11c 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClientPreCatalog.java
@@ -2542,17 +2542,27 @@ public class HiveMetaStoreClientPreCatalog implements IMetaStoreClient, AutoClos
     }
     client.repl_tbl_writeid_state(rqst);
   }
+  @Override
+  public long allocateTableWriteId(long txnId, String dbName, String tableName, boolean shouldRealloc) throws TException {
+    return allocateTableWriteIdsBatch(Collections.singletonList(txnId), dbName, tableName, shouldRealloc).get(0).getWriteId();
+  }
 
   @Override
   public long allocateTableWriteId(long txnId, String dbName, String tableName) throws TException {
-    return allocateTableWriteIdsBatch(Collections.singletonList(txnId), dbName, tableName).get(0).getWriteId();
+    return allocateTableWriteIdsBatch(Collections.singletonList(txnId), dbName, tableName, false).get(0).getWriteId();
   }
 
   @Override
   public List<TxnToWriteId> allocateTableWriteIdsBatch(List<Long> txnIds, String dbName, String tableName)
           throws TException {
+    return allocateTableWriteIdsBatch(txnIds, dbName, tableName, false);
+  }
+
+  public List<TxnToWriteId> allocateTableWriteIdsBatch(List<Long> txnIds, String dbName, String tableName, boolean shouldRealloc)
+          throws TException {
     AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(dbName, tableName);
     rqst.setTxnIds(txnIds);
+    rqst.setReallocate(shouldRealloc);
     return allocateTableWriteIdsBatchIntr(rqst);
   }