You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dj...@apache.org on 2018/05/08 18:42:07 UTC

[02/58] [abbrv] hive git commit: HIVE-18988: Support bootstrap replication of ACID tables (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Thejas M Nair)

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index b1e577a..accb94f 100644
--- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -11532,6 +11532,154 @@ class CommitTxnRequest:
   def __ne__(self, other):
     return not (self == other)
 
+class ReplTblWriteIdStateRequest:
+  """
+  Attributes:
+   - validWriteIdlist
+   - user
+   - hostName
+   - dbName
+   - tableName
+   - partNames
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'validWriteIdlist', None, None, ), # 1
+    (2, TType.STRING, 'user', None, None, ), # 2
+    (3, TType.STRING, 'hostName', None, None, ), # 3
+    (4, TType.STRING, 'dbName', None, None, ), # 4
+    (5, TType.STRING, 'tableName', None, None, ), # 5
+    (6, TType.LIST, 'partNames', (TType.STRING,None), None, ), # 6
+  )
+
+  def __init__(self, validWriteIdlist=None, user=None, hostName=None, dbName=None, tableName=None, partNames=None,):
+    self.validWriteIdlist = validWriteIdlist
+    self.user = user
+    self.hostName = hostName
+    self.dbName = dbName
+    self.tableName = tableName
+    self.partNames = partNames
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.validWriteIdlist = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.user = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.STRING:
+          self.hostName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.STRING:
+          self.dbName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 5:
+        if ftype == TType.STRING:
+          self.tableName = iprot.readString()
+        else:
+          iprot.skip(ftype)
+      elif fid == 6:
+        if ftype == TType.LIST:
+          self.partNames = []
+          (_etype526, _size523) = iprot.readListBegin()
+          for _i527 in xrange(_size523):
+            _elem528 = iprot.readString()
+            self.partNames.append(_elem528)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('ReplTblWriteIdStateRequest')
+    if self.validWriteIdlist is not None:
+      oprot.writeFieldBegin('validWriteIdlist', TType.STRING, 1)
+      oprot.writeString(self.validWriteIdlist)
+      oprot.writeFieldEnd()
+    if self.user is not None:
+      oprot.writeFieldBegin('user', TType.STRING, 2)
+      oprot.writeString(self.user)
+      oprot.writeFieldEnd()
+    if self.hostName is not None:
+      oprot.writeFieldBegin('hostName', TType.STRING, 3)
+      oprot.writeString(self.hostName)
+      oprot.writeFieldEnd()
+    if self.dbName is not None:
+      oprot.writeFieldBegin('dbName', TType.STRING, 4)
+      oprot.writeString(self.dbName)
+      oprot.writeFieldEnd()
+    if self.tableName is not None:
+      oprot.writeFieldBegin('tableName', TType.STRING, 5)
+      oprot.writeString(self.tableName)
+      oprot.writeFieldEnd()
+    if self.partNames is not None:
+      oprot.writeFieldBegin('partNames', TType.LIST, 6)
+      oprot.writeListBegin(TType.STRING, len(self.partNames))
+      for iter529 in self.partNames:
+        oprot.writeString(iter529)
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.validWriteIdlist is None:
+      raise TProtocol.TProtocolException(message='Required field validWriteIdlist is unset!')
+    if self.user is None:
+      raise TProtocol.TProtocolException(message='Required field user is unset!')
+    if self.hostName is None:
+      raise TProtocol.TProtocolException(message='Required field hostName is unset!')
+    if self.dbName is None:
+      raise TProtocol.TProtocolException(message='Required field dbName is unset!')
+    if self.tableName is None:
+      raise TProtocol.TProtocolException(message='Required field tableName is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.validWriteIdlist)
+    value = (value * 31) ^ hash(self.user)
+    value = (value * 31) ^ hash(self.hostName)
+    value = (value * 31) ^ hash(self.dbName)
+    value = (value * 31) ^ hash(self.tableName)
+    value = (value * 31) ^ hash(self.partNames)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class GetValidWriteIdsRequest:
   """
   Attributes:
@@ -11561,10 +11709,10 @@ class GetValidWriteIdsRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.fullTableNames = []
-          (_etype526, _size523) = iprot.readListBegin()
-          for _i527 in xrange(_size523):
-            _elem528 = iprot.readString()
-            self.fullTableNames.append(_elem528)
+          (_etype533, _size530) = iprot.readListBegin()
+          for _i534 in xrange(_size530):
+            _elem535 = iprot.readString()
+            self.fullTableNames.append(_elem535)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11586,8 +11734,8 @@ class GetValidWriteIdsRequest:
     if self.fullTableNames is not None:
       oprot.writeFieldBegin('fullTableNames', TType.LIST, 1)
       oprot.writeListBegin(TType.STRING, len(self.fullTableNames))
-      for iter529 in self.fullTableNames:
-        oprot.writeString(iter529)
+      for iter536 in self.fullTableNames:
+        oprot.writeString(iter536)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.validTxnList is not None:
@@ -11670,10 +11818,10 @@ class TableValidWriteIds:
       elif fid == 3:
         if ftype == TType.LIST:
           self.invalidWriteIds = []
-          (_etype533, _size530) = iprot.readListBegin()
-          for _i534 in xrange(_size530):
-            _elem535 = iprot.readI64()
-            self.invalidWriteIds.append(_elem535)
+          (_etype540, _size537) = iprot.readListBegin()
+          for _i541 in xrange(_size537):
+            _elem542 = iprot.readI64()
+            self.invalidWriteIds.append(_elem542)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11708,8 +11856,8 @@ class TableValidWriteIds:
     if self.invalidWriteIds is not None:
       oprot.writeFieldBegin('invalidWriteIds', TType.LIST, 3)
       oprot.writeListBegin(TType.I64, len(self.invalidWriteIds))
-      for iter536 in self.invalidWriteIds:
-        oprot.writeI64(iter536)
+      for iter543 in self.invalidWriteIds:
+        oprot.writeI64(iter543)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.minOpenWriteId is not None:
@@ -11781,11 +11929,11 @@ class GetValidWriteIdsResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.tblValidWriteIds = []
-          (_etype540, _size537) = iprot.readListBegin()
-          for _i541 in xrange(_size537):
-            _elem542 = TableValidWriteIds()
-            _elem542.read(iprot)
-            self.tblValidWriteIds.append(_elem542)
+          (_etype547, _size544) = iprot.readListBegin()
+          for _i548 in xrange(_size544):
+            _elem549 = TableValidWriteIds()
+            _elem549.read(iprot)
+            self.tblValidWriteIds.append(_elem549)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11802,8 +11950,8 @@ class GetValidWriteIdsResponse:
     if self.tblValidWriteIds is not None:
       oprot.writeFieldBegin('tblValidWriteIds', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.tblValidWriteIds))
-      for iter543 in self.tblValidWriteIds:
-        iter543.write(oprot)
+      for iter550 in self.tblValidWriteIds:
+        iter550.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -11879,10 +12027,10 @@ class AllocateTableWriteIdsRequest:
       elif fid == 3:
         if ftype == TType.LIST:
           self.txnIds = []
-          (_etype547, _size544) = iprot.readListBegin()
-          for _i548 in xrange(_size544):
-            _elem549 = iprot.readI64()
-            self.txnIds.append(_elem549)
+          (_etype554, _size551) = iprot.readListBegin()
+          for _i555 in xrange(_size551):
+            _elem556 = iprot.readI64()
+            self.txnIds.append(_elem556)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11894,11 +12042,11 @@ class AllocateTableWriteIdsRequest:
       elif fid == 5:
         if ftype == TType.LIST:
           self.srcTxnToWriteIdList = []
-          (_etype553, _size550) = iprot.readListBegin()
-          for _i554 in xrange(_size550):
-            _elem555 = TxnToWriteId()
-            _elem555.read(iprot)
-            self.srcTxnToWriteIdList.append(_elem555)
+          (_etype560, _size557) = iprot.readListBegin()
+          for _i561 in xrange(_size557):
+            _elem562 = TxnToWriteId()
+            _elem562.read(iprot)
+            self.srcTxnToWriteIdList.append(_elem562)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -11923,8 +12071,8 @@ class AllocateTableWriteIdsRequest:
     if self.txnIds is not None:
       oprot.writeFieldBegin('txnIds', TType.LIST, 3)
       oprot.writeListBegin(TType.I64, len(self.txnIds))
-      for iter556 in self.txnIds:
-        oprot.writeI64(iter556)
+      for iter563 in self.txnIds:
+        oprot.writeI64(iter563)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.replPolicy is not None:
@@ -11934,8 +12082,8 @@ class AllocateTableWriteIdsRequest:
     if self.srcTxnToWriteIdList is not None:
       oprot.writeFieldBegin('srcTxnToWriteIdList', TType.LIST, 5)
       oprot.writeListBegin(TType.STRUCT, len(self.srcTxnToWriteIdList))
-      for iter557 in self.srcTxnToWriteIdList:
-        iter557.write(oprot)
+      for iter564 in self.srcTxnToWriteIdList:
+        iter564.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -12077,11 +12225,11 @@ class AllocateTableWriteIdsResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.txnToWriteIds = []
-          (_etype561, _size558) = iprot.readListBegin()
-          for _i562 in xrange(_size558):
-            _elem563 = TxnToWriteId()
-            _elem563.read(iprot)
-            self.txnToWriteIds.append(_elem563)
+          (_etype568, _size565) = iprot.readListBegin()
+          for _i569 in xrange(_size565):
+            _elem570 = TxnToWriteId()
+            _elem570.read(iprot)
+            self.txnToWriteIds.append(_elem570)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12098,8 +12246,8 @@ class AllocateTableWriteIdsResponse:
     if self.txnToWriteIds is not None:
       oprot.writeFieldBegin('txnToWriteIds', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.txnToWriteIds))
-      for iter564 in self.txnToWriteIds:
-        iter564.write(oprot)
+      for iter571 in self.txnToWriteIds:
+        iter571.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -12327,11 +12475,11 @@ class LockRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.component = []
-          (_etype568, _size565) = iprot.readListBegin()
-          for _i569 in xrange(_size565):
-            _elem570 = LockComponent()
-            _elem570.read(iprot)
-            self.component.append(_elem570)
+          (_etype575, _size572) = iprot.readListBegin()
+          for _i576 in xrange(_size572):
+            _elem577 = LockComponent()
+            _elem577.read(iprot)
+            self.component.append(_elem577)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -12368,8 +12516,8 @@ class LockRequest:
     if self.component is not None:
       oprot.writeFieldBegin('component', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.component))
-      for iter571 in self.component:
-        iter571.write(oprot)
+      for iter578 in self.component:
+        iter578.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.txnid is not None:
@@ -13067,11 +13215,11 @@ class ShowLocksResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.locks = []
-          (_etype575, _size572) = iprot.readListBegin()
-          for _i576 in xrange(_size572):
-            _elem577 = ShowLocksResponseElement()
-            _elem577.read(iprot)
-            self.locks.append(_elem577)
+          (_etype582, _size579) = iprot.readListBegin()
+          for _i583 in xrange(_size579):
+            _elem584 = ShowLocksResponseElement()
+            _elem584.read(iprot)
+            self.locks.append(_elem584)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -13088,8 +13236,8 @@ class ShowLocksResponse:
     if self.locks is not None:
       oprot.writeFieldBegin('locks', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.locks))
-      for iter578 in self.locks:
-        iter578.write(oprot)
+      for iter585 in self.locks:
+        iter585.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -13304,20 +13452,20 @@ class HeartbeatTxnRangeResponse:
       if fid == 1:
         if ftype == TType.SET:
           self.aborted = set()
-          (_etype582, _size579) = iprot.readSetBegin()
-          for _i583 in xrange(_size579):
-            _elem584 = iprot.readI64()
-            self.aborted.add(_elem584)
+          (_etype589, _size586) = iprot.readSetBegin()
+          for _i590 in xrange(_size586):
+            _elem591 = iprot.readI64()
+            self.aborted.add(_elem591)
           iprot.readSetEnd()
         else:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.SET:
           self.nosuch = set()
-          (_etype588, _size585) = iprot.readSetBegin()
-          for _i589 in xrange(_size585):
-            _elem590 = iprot.readI64()
-            self.nosuch.add(_elem590)
+          (_etype595, _size592) = iprot.readSetBegin()
+          for _i596 in xrange(_size592):
+            _elem597 = iprot.readI64()
+            self.nosuch.add(_elem597)
           iprot.readSetEnd()
         else:
           iprot.skip(ftype)
@@ -13334,15 +13482,15 @@ class HeartbeatTxnRangeResponse:
     if self.aborted is not None:
       oprot.writeFieldBegin('aborted', TType.SET, 1)
       oprot.writeSetBegin(TType.I64, len(self.aborted))
-      for iter591 in self.aborted:
-        oprot.writeI64(iter591)
+      for iter598 in self.aborted:
+        oprot.writeI64(iter598)
       oprot.writeSetEnd()
       oprot.writeFieldEnd()
     if self.nosuch is not None:
       oprot.writeFieldBegin('nosuch', TType.SET, 2)
       oprot.writeSetBegin(TType.I64, len(self.nosuch))
-      for iter592 in self.nosuch:
-        oprot.writeI64(iter592)
+      for iter599 in self.nosuch:
+        oprot.writeI64(iter599)
       oprot.writeSetEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -13439,11 +13587,11 @@ class CompactionRequest:
       elif fid == 6:
         if ftype == TType.MAP:
           self.properties = {}
-          (_ktype594, _vtype595, _size593 ) = iprot.readMapBegin()
-          for _i597 in xrange(_size593):
-            _key598 = iprot.readString()
-            _val599 = iprot.readString()
-            self.properties[_key598] = _val599
+          (_ktype601, _vtype602, _size600 ) = iprot.readMapBegin()
+          for _i604 in xrange(_size600):
+            _key605 = iprot.readString()
+            _val606 = iprot.readString()
+            self.properties[_key605] = _val606
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -13480,9 +13628,9 @@ class CompactionRequest:
     if self.properties is not None:
       oprot.writeFieldBegin('properties', TType.MAP, 6)
       oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.properties))
-      for kiter600,viter601 in self.properties.items():
-        oprot.writeString(kiter600)
-        oprot.writeString(viter601)
+      for kiter607,viter608 in self.properties.items():
+        oprot.writeString(kiter607)
+        oprot.writeString(viter608)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -13917,11 +14065,11 @@ class ShowCompactResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.compacts = []
-          (_etype605, _size602) = iprot.readListBegin()
-          for _i606 in xrange(_size602):
-            _elem607 = ShowCompactResponseElement()
-            _elem607.read(iprot)
-            self.compacts.append(_elem607)
+          (_etype612, _size609) = iprot.readListBegin()
+          for _i613 in xrange(_size609):
+            _elem614 = ShowCompactResponseElement()
+            _elem614.read(iprot)
+            self.compacts.append(_elem614)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -13938,8 +14086,8 @@ class ShowCompactResponse:
     if self.compacts is not None:
       oprot.writeFieldBegin('compacts', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.compacts))
-      for iter608 in self.compacts:
-        iter608.write(oprot)
+      for iter615 in self.compacts:
+        iter615.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -14028,10 +14176,10 @@ class AddDynamicPartitions:
       elif fid == 5:
         if ftype == TType.LIST:
           self.partitionnames = []
-          (_etype612, _size609) = iprot.readListBegin()
-          for _i613 in xrange(_size609):
-            _elem614 = iprot.readString()
-            self.partitionnames.append(_elem614)
+          (_etype619, _size616) = iprot.readListBegin()
+          for _i620 in xrange(_size616):
+            _elem621 = iprot.readString()
+            self.partitionnames.append(_elem621)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -14069,8 +14217,8 @@ class AddDynamicPartitions:
     if self.partitionnames is not None:
       oprot.writeFieldBegin('partitionnames', TType.LIST, 5)
       oprot.writeListBegin(TType.STRING, len(self.partitionnames))
-      for iter615 in self.partitionnames:
-        oprot.writeString(iter615)
+      for iter622 in self.partitionnames:
+        oprot.writeString(iter622)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.operationType is not None:
@@ -14300,10 +14448,10 @@ class CreationMetadata:
       elif fid == 4:
         if ftype == TType.SET:
           self.tablesUsed = set()
-          (_etype619, _size616) = iprot.readSetBegin()
-          for _i620 in xrange(_size616):
-            _elem621 = iprot.readString()
-            self.tablesUsed.add(_elem621)
+          (_etype626, _size623) = iprot.readSetBegin()
+          for _i627 in xrange(_size623):
+            _elem628 = iprot.readString()
+            self.tablesUsed.add(_elem628)
           iprot.readSetEnd()
         else:
           iprot.skip(ftype)
@@ -14337,8 +14485,8 @@ class CreationMetadata:
     if self.tablesUsed is not None:
       oprot.writeFieldBegin('tablesUsed', TType.SET, 4)
       oprot.writeSetBegin(TType.STRING, len(self.tablesUsed))
-      for iter622 in self.tablesUsed:
-        oprot.writeString(iter622)
+      for iter629 in self.tablesUsed:
+        oprot.writeString(iter629)
       oprot.writeSetEnd()
       oprot.writeFieldEnd()
     if self.validTxnList is not None:
@@ -14650,11 +14798,11 @@ class NotificationEventResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.events = []
-          (_etype626, _size623) = iprot.readListBegin()
-          for _i627 in xrange(_size623):
-            _elem628 = NotificationEvent()
-            _elem628.read(iprot)
-            self.events.append(_elem628)
+          (_etype633, _size630) = iprot.readListBegin()
+          for _i634 in xrange(_size630):
+            _elem635 = NotificationEvent()
+            _elem635.read(iprot)
+            self.events.append(_elem635)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -14671,8 +14819,8 @@ class NotificationEventResponse:
     if self.events is not None:
       oprot.writeFieldBegin('events', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.events))
-      for iter629 in self.events:
-        iter629.write(oprot)
+      for iter636 in self.events:
+        iter636.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -14966,20 +15114,20 @@ class InsertEventRequestData:
       elif fid == 2:
         if ftype == TType.LIST:
           self.filesAdded = []
-          (_etype633, _size630) = iprot.readListBegin()
-          for _i634 in xrange(_size630):
-            _elem635 = iprot.readString()
-            self.filesAdded.append(_elem635)
+          (_etype640, _size637) = iprot.readListBegin()
+          for _i641 in xrange(_size637):
+            _elem642 = iprot.readString()
+            self.filesAdded.append(_elem642)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.LIST:
           self.filesAddedChecksum = []
-          (_etype639, _size636) = iprot.readListBegin()
-          for _i640 in xrange(_size636):
-            _elem641 = iprot.readString()
-            self.filesAddedChecksum.append(_elem641)
+          (_etype646, _size643) = iprot.readListBegin()
+          for _i647 in xrange(_size643):
+            _elem648 = iprot.readString()
+            self.filesAddedChecksum.append(_elem648)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -15000,15 +15148,15 @@ class InsertEventRequestData:
     if self.filesAdded is not None:
       oprot.writeFieldBegin('filesAdded', TType.LIST, 2)
       oprot.writeListBegin(TType.STRING, len(self.filesAdded))
-      for iter642 in self.filesAdded:
-        oprot.writeString(iter642)
+      for iter649 in self.filesAdded:
+        oprot.writeString(iter649)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.filesAddedChecksum is not None:
       oprot.writeFieldBegin('filesAddedChecksum', TType.LIST, 3)
       oprot.writeListBegin(TType.STRING, len(self.filesAddedChecksum))
-      for iter643 in self.filesAddedChecksum:
-        oprot.writeString(iter643)
+      for iter650 in self.filesAddedChecksum:
+        oprot.writeString(iter650)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -15166,10 +15314,10 @@ class FireEventRequest:
       elif fid == 5:
         if ftype == TType.LIST:
           self.partitionVals = []
-          (_etype647, _size644) = iprot.readListBegin()
-          for _i648 in xrange(_size644):
-            _elem649 = iprot.readString()
-            self.partitionVals.append(_elem649)
+          (_etype654, _size651) = iprot.readListBegin()
+          for _i655 in xrange(_size651):
+            _elem656 = iprot.readString()
+            self.partitionVals.append(_elem656)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -15207,8 +15355,8 @@ class FireEventRequest:
     if self.partitionVals is not None:
       oprot.writeFieldBegin('partitionVals', TType.LIST, 5)
       oprot.writeListBegin(TType.STRING, len(self.partitionVals))
-      for iter650 in self.partitionVals:
-        oprot.writeString(iter650)
+      for iter657 in self.partitionVals:
+        oprot.writeString(iter657)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.catName is not None:
@@ -15400,12 +15548,12 @@ class GetFileMetadataByExprResult:
       if fid == 1:
         if ftype == TType.MAP:
           self.metadata = {}
-          (_ktype652, _vtype653, _size651 ) = iprot.readMapBegin()
-          for _i655 in xrange(_size651):
-            _key656 = iprot.readI64()
-            _val657 = MetadataPpdResult()
-            _val657.read(iprot)
-            self.metadata[_key656] = _val657
+          (_ktype659, _vtype660, _size658 ) = iprot.readMapBegin()
+          for _i662 in xrange(_size658):
+            _key663 = iprot.readI64()
+            _val664 = MetadataPpdResult()
+            _val664.read(iprot)
+            self.metadata[_key663] = _val664
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -15427,9 +15575,9 @@ class GetFileMetadataByExprResult:
     if self.metadata is not None:
       oprot.writeFieldBegin('metadata', TType.MAP, 1)
       oprot.writeMapBegin(TType.I64, TType.STRUCT, len(self.metadata))
-      for kiter658,viter659 in self.metadata.items():
-        oprot.writeI64(kiter658)
-        viter659.write(oprot)
+      for kiter665,viter666 in self.metadata.items():
+        oprot.writeI64(kiter665)
+        viter666.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.isSupported is not None:
@@ -15499,10 +15647,10 @@ class GetFileMetadataByExprRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.fileIds = []
-          (_etype663, _size660) = iprot.readListBegin()
-          for _i664 in xrange(_size660):
-            _elem665 = iprot.readI64()
-            self.fileIds.append(_elem665)
+          (_etype670, _size667) = iprot.readListBegin()
+          for _i671 in xrange(_size667):
+            _elem672 = iprot.readI64()
+            self.fileIds.append(_elem672)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -15534,8 +15682,8 @@ class GetFileMetadataByExprRequest:
     if self.fileIds is not None:
       oprot.writeFieldBegin('fileIds', TType.LIST, 1)
       oprot.writeListBegin(TType.I64, len(self.fileIds))
-      for iter666 in self.fileIds:
-        oprot.writeI64(iter666)
+      for iter673 in self.fileIds:
+        oprot.writeI64(iter673)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.expr is not None:
@@ -15609,11 +15757,11 @@ class GetFileMetadataResult:
       if fid == 1:
         if ftype == TType.MAP:
           self.metadata = {}
-          (_ktype668, _vtype669, _size667 ) = iprot.readMapBegin()
-          for _i671 in xrange(_size667):
-            _key672 = iprot.readI64()
-            _val673 = iprot.readString()
-            self.metadata[_key672] = _val673
+          (_ktype675, _vtype676, _size674 ) = iprot.readMapBegin()
+          for _i678 in xrange(_size674):
+            _key679 = iprot.readI64()
+            _val680 = iprot.readString()
+            self.metadata[_key679] = _val680
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -15635,9 +15783,9 @@ class GetFileMetadataResult:
     if self.metadata is not None:
       oprot.writeFieldBegin('metadata', TType.MAP, 1)
       oprot.writeMapBegin(TType.I64, TType.STRING, len(self.metadata))
-      for kiter674,viter675 in self.metadata.items():
-        oprot.writeI64(kiter674)
-        oprot.writeString(viter675)
+      for kiter681,viter682 in self.metadata.items():
+        oprot.writeI64(kiter681)
+        oprot.writeString(viter682)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.isSupported is not None:
@@ -15698,10 +15846,10 @@ class GetFileMetadataRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.fileIds = []
-          (_etype679, _size676) = iprot.readListBegin()
-          for _i680 in xrange(_size676):
-            _elem681 = iprot.readI64()
-            self.fileIds.append(_elem681)
+          (_etype686, _size683) = iprot.readListBegin()
+          for _i687 in xrange(_size683):
+            _elem688 = iprot.readI64()
+            self.fileIds.append(_elem688)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -15718,8 +15866,8 @@ class GetFileMetadataRequest:
     if self.fileIds is not None:
       oprot.writeFieldBegin('fileIds', TType.LIST, 1)
       oprot.writeListBegin(TType.I64, len(self.fileIds))
-      for iter682 in self.fileIds:
-        oprot.writeI64(iter682)
+      for iter689 in self.fileIds:
+        oprot.writeI64(iter689)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -15825,20 +15973,20 @@ class PutFileMetadataRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.fileIds = []
-          (_etype686, _size683) = iprot.readListBegin()
-          for _i687 in xrange(_size683):
-            _elem688 = iprot.readI64()
-            self.fileIds.append(_elem688)
+          (_etype693, _size690) = iprot.readListBegin()
+          for _i694 in xrange(_size690):
+            _elem695 = iprot.readI64()
+            self.fileIds.append(_elem695)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.LIST:
           self.metadata = []
-          (_etype692, _size689) = iprot.readListBegin()
-          for _i693 in xrange(_size689):
-            _elem694 = iprot.readString()
-            self.metadata.append(_elem694)
+          (_etype699, _size696) = iprot.readListBegin()
+          for _i700 in xrange(_size696):
+            _elem701 = iprot.readString()
+            self.metadata.append(_elem701)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -15860,15 +16008,15 @@ class PutFileMetadataRequest:
     if self.fileIds is not None:
       oprot.writeFieldBegin('fileIds', TType.LIST, 1)
       oprot.writeListBegin(TType.I64, len(self.fileIds))
-      for iter695 in self.fileIds:
-        oprot.writeI64(iter695)
+      for iter702 in self.fileIds:
+        oprot.writeI64(iter702)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.metadata is not None:
       oprot.writeFieldBegin('metadata', TType.LIST, 2)
       oprot.writeListBegin(TType.STRING, len(self.metadata))
-      for iter696 in self.metadata:
-        oprot.writeString(iter696)
+      for iter703 in self.metadata:
+        oprot.writeString(iter703)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.type is not None:
@@ -15976,10 +16124,10 @@ class ClearFileMetadataRequest:
       if fid == 1:
         if ftype == TType.LIST:
           self.fileIds = []
-          (_etype700, _size697) = iprot.readListBegin()
-          for _i701 in xrange(_size697):
-            _elem702 = iprot.readI64()
-            self.fileIds.append(_elem702)
+          (_etype707, _size704) = iprot.readListBegin()
+          for _i708 in xrange(_size704):
+            _elem709 = iprot.readI64()
+            self.fileIds.append(_elem709)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -15996,8 +16144,8 @@ class ClearFileMetadataRequest:
     if self.fileIds is not None:
       oprot.writeFieldBegin('fileIds', TType.LIST, 1)
       oprot.writeListBegin(TType.I64, len(self.fileIds))
-      for iter703 in self.fileIds:
-        oprot.writeI64(iter703)
+      for iter710 in self.fileIds:
+        oprot.writeI64(iter710)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -16226,11 +16374,11 @@ class GetAllFunctionsResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.functions = []
-          (_etype707, _size704) = iprot.readListBegin()
-          for _i708 in xrange(_size704):
-            _elem709 = Function()
-            _elem709.read(iprot)
-            self.functions.append(_elem709)
+          (_etype714, _size711) = iprot.readListBegin()
+          for _i715 in xrange(_size711):
+            _elem716 = Function()
+            _elem716.read(iprot)
+            self.functions.append(_elem716)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -16247,8 +16395,8 @@ class GetAllFunctionsResponse:
     if self.functions is not None:
       oprot.writeFieldBegin('functions', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.functions))
-      for iter710 in self.functions:
-        iter710.write(oprot)
+      for iter717 in self.functions:
+        iter717.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -16300,10 +16448,10 @@ class ClientCapabilities:
       if fid == 1:
         if ftype == TType.LIST:
           self.values = []
-          (_etype714, _size711) = iprot.readListBegin()
-          for _i715 in xrange(_size711):
-            _elem716 = iprot.readI32()
-            self.values.append(_elem716)
+          (_etype721, _size718) = iprot.readListBegin()
+          for _i722 in xrange(_size718):
+            _elem723 = iprot.readI32()
+            self.values.append(_elem723)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -16320,8 +16468,8 @@ class ClientCapabilities:
     if self.values is not None:
       oprot.writeFieldBegin('values', TType.LIST, 1)
       oprot.writeListBegin(TType.I32, len(self.values))
-      for iter717 in self.values:
-        oprot.writeI32(iter717)
+      for iter724 in self.values:
+        oprot.writeI32(iter724)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -16566,10 +16714,10 @@ class GetTablesRequest:
       elif fid == 2:
         if ftype == TType.LIST:
           self.tblNames = []
-          (_etype721, _size718) = iprot.readListBegin()
-          for _i722 in xrange(_size718):
-            _elem723 = iprot.readString()
-            self.tblNames.append(_elem723)
+          (_etype728, _size725) = iprot.readListBegin()
+          for _i729 in xrange(_size725):
+            _elem730 = iprot.readString()
+            self.tblNames.append(_elem730)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -16601,8 +16749,8 @@ class GetTablesRequest:
     if self.tblNames is not None:
       oprot.writeFieldBegin('tblNames', TType.LIST, 2)
       oprot.writeListBegin(TType.STRING, len(self.tblNames))
-      for iter724 in self.tblNames:
-        oprot.writeString(iter724)
+      for iter731 in self.tblNames:
+        oprot.writeString(iter731)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.capabilities is not None:
@@ -16667,11 +16815,11 @@ class GetTablesResult:
       if fid == 1:
         if ftype == TType.LIST:
           self.tables = []
-          (_etype728, _size725) = iprot.readListBegin()
-          for _i729 in xrange(_size725):
-            _elem730 = Table()
-            _elem730.read(iprot)
-            self.tables.append(_elem730)
+          (_etype735, _size732) = iprot.readListBegin()
+          for _i736 in xrange(_size732):
+            _elem737 = Table()
+            _elem737.read(iprot)
+            self.tables.append(_elem737)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -16688,8 +16836,8 @@ class GetTablesResult:
     if self.tables is not None:
       oprot.writeFieldBegin('tables', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.tables))
-      for iter731 in self.tables:
-        iter731.write(oprot)
+      for iter738 in self.tables:
+        iter738.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -17003,10 +17151,10 @@ class Materialization:
       if fid == 1:
         if ftype == TType.SET:
           self.tablesUsed = set()
-          (_etype735, _size732) = iprot.readSetBegin()
-          for _i736 in xrange(_size732):
-            _elem737 = iprot.readString()
-            self.tablesUsed.add(_elem737)
+          (_etype742, _size739) = iprot.readSetBegin()
+          for _i743 in xrange(_size739):
+            _elem744 = iprot.readString()
+            self.tablesUsed.add(_elem744)
           iprot.readSetEnd()
         else:
           iprot.skip(ftype)
@@ -17038,8 +17186,8 @@ class Materialization:
     if self.tablesUsed is not None:
       oprot.writeFieldBegin('tablesUsed', TType.SET, 1)
       oprot.writeSetBegin(TType.STRING, len(self.tablesUsed))
-      for iter738 in self.tablesUsed:
-        oprot.writeString(iter738)
+      for iter745 in self.tablesUsed:
+        oprot.writeString(iter745)
       oprot.writeSetEnd()
       oprot.writeFieldEnd()
     if self.validTxnList is not None:
@@ -17944,44 +18092,44 @@ class WMFullResourcePlan:
       elif fid == 2:
         if ftype == TType.LIST:
           self.pools = []
-          (_etype742, _size739) = iprot.readListBegin()
-          for _i743 in xrange(_size739):
-            _elem744 = WMPool()
-            _elem744.read(iprot)
-            self.pools.append(_elem744)
+          (_etype749, _size746) = iprot.readListBegin()
+          for _i750 in xrange(_size746):
+            _elem751 = WMPool()
+            _elem751.read(iprot)
+            self.pools.append(_elem751)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.LIST:
           self.mappings = []
-          (_etype748, _size745) = iprot.readListBegin()
-          for _i749 in xrange(_size745):
-            _elem750 = WMMapping()
-            _elem750.read(iprot)
-            self.mappings.append(_elem750)
+          (_etype755, _size752) = iprot.readListBegin()
+          for _i756 in xrange(_size752):
+            _elem757 = WMMapping()
+            _elem757.read(iprot)
+            self.mappings.append(_elem757)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.LIST:
           self.triggers = []
-          (_etype754, _size751) = iprot.readListBegin()
-          for _i755 in xrange(_size751):
-            _elem756 = WMTrigger()
-            _elem756.read(iprot)
-            self.triggers.append(_elem756)
+          (_etype761, _size758) = iprot.readListBegin()
+          for _i762 in xrange(_size758):
+            _elem763 = WMTrigger()
+            _elem763.read(iprot)
+            self.triggers.append(_elem763)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 5:
         if ftype == TType.LIST:
           self.poolTriggers = []
-          (_etype760, _size757) = iprot.readListBegin()
-          for _i761 in xrange(_size757):
-            _elem762 = WMPoolTrigger()
-            _elem762.read(iprot)
-            self.poolTriggers.append(_elem762)
+          (_etype767, _size764) = iprot.readListBegin()
+          for _i768 in xrange(_size764):
+            _elem769 = WMPoolTrigger()
+            _elem769.read(iprot)
+            self.poolTriggers.append(_elem769)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -18002,29 +18150,29 @@ class WMFullResourcePlan:
     if self.pools is not None:
       oprot.writeFieldBegin('pools', TType.LIST, 2)
       oprot.writeListBegin(TType.STRUCT, len(self.pools))
-      for iter763 in self.pools:
-        iter763.write(oprot)
+      for iter770 in self.pools:
+        iter770.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.mappings is not None:
       oprot.writeFieldBegin('mappings', TType.LIST, 3)
       oprot.writeListBegin(TType.STRUCT, len(self.mappings))
-      for iter764 in self.mappings:
-        iter764.write(oprot)
+      for iter771 in self.mappings:
+        iter771.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.triggers is not None:
       oprot.writeFieldBegin('triggers', TType.LIST, 4)
       oprot.writeListBegin(TType.STRUCT, len(self.triggers))
-      for iter765 in self.triggers:
-        iter765.write(oprot)
+      for iter772 in self.triggers:
+        iter772.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.poolTriggers is not None:
       oprot.writeFieldBegin('poolTriggers', TType.LIST, 5)
       oprot.writeListBegin(TType.STRUCT, len(self.poolTriggers))
-      for iter766 in self.poolTriggers:
-        iter766.write(oprot)
+      for iter773 in self.poolTriggers:
+        iter773.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -18498,11 +18646,11 @@ class WMGetAllResourcePlanResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.resourcePlans = []
-          (_etype770, _size767) = iprot.readListBegin()
-          for _i771 in xrange(_size767):
-            _elem772 = WMResourcePlan()
-            _elem772.read(iprot)
-            self.resourcePlans.append(_elem772)
+          (_etype777, _size774) = iprot.readListBegin()
+          for _i778 in xrange(_size774):
+            _elem779 = WMResourcePlan()
+            _elem779.read(iprot)
+            self.resourcePlans.append(_elem779)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -18519,8 +18667,8 @@ class WMGetAllResourcePlanResponse:
     if self.resourcePlans is not None:
       oprot.writeFieldBegin('resourcePlans', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.resourcePlans))
-      for iter773 in self.resourcePlans:
-        iter773.write(oprot)
+      for iter780 in self.resourcePlans:
+        iter780.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -18824,20 +18972,20 @@ class WMValidateResourcePlanResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.errors = []
-          (_etype777, _size774) = iprot.readListBegin()
-          for _i778 in xrange(_size774):
-            _elem779 = iprot.readString()
-            self.errors.append(_elem779)
+          (_etype784, _size781) = iprot.readListBegin()
+          for _i785 in xrange(_size781):
+            _elem786 = iprot.readString()
+            self.errors.append(_elem786)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.LIST:
           self.warnings = []
-          (_etype783, _size780) = iprot.readListBegin()
-          for _i784 in xrange(_size780):
-            _elem785 = iprot.readString()
-            self.warnings.append(_elem785)
+          (_etype790, _size787) = iprot.readListBegin()
+          for _i791 in xrange(_size787):
+            _elem792 = iprot.readString()
+            self.warnings.append(_elem792)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -18854,15 +19002,15 @@ class WMValidateResourcePlanResponse:
     if self.errors is not None:
       oprot.writeFieldBegin('errors', TType.LIST, 1)
       oprot.writeListBegin(TType.STRING, len(self.errors))
-      for iter786 in self.errors:
-        oprot.writeString(iter786)
+      for iter793 in self.errors:
+        oprot.writeString(iter793)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.warnings is not None:
       oprot.writeFieldBegin('warnings', TType.LIST, 2)
       oprot.writeListBegin(TType.STRING, len(self.warnings))
-      for iter787 in self.warnings:
-        oprot.writeString(iter787)
+      for iter794 in self.warnings:
+        oprot.writeString(iter794)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -19439,11 +19587,11 @@ class WMGetTriggersForResourePlanResponse:
       if fid == 1:
         if ftype == TType.LIST:
           self.triggers = []
-          (_etype791, _size788) = iprot.readListBegin()
-          for _i792 in xrange(_size788):
-            _elem793 = WMTrigger()
-            _elem793.read(iprot)
-            self.triggers.append(_elem793)
+          (_etype798, _size795) = iprot.readListBegin()
+          for _i799 in xrange(_size795):
+            _elem800 = WMTrigger()
+            _elem800.read(iprot)
+            self.triggers.append(_elem800)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -19460,8 +19608,8 @@ class WMGetTriggersForResourePlanResponse:
     if self.triggers is not None:
       oprot.writeFieldBegin('triggers', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.triggers))
-      for iter794 in self.triggers:
-        iter794.write(oprot)
+      for iter801 in self.triggers:
+        iter801.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -20645,11 +20793,11 @@ class SchemaVersion:
       elif fid == 4:
         if ftype == TType.LIST:
           self.cols = []
-          (_etype798, _size795) = iprot.readListBegin()
-          for _i799 in xrange(_size795):
-            _elem800 = FieldSchema()
-            _elem800.read(iprot)
-            self.cols.append(_elem800)
+          (_etype805, _size802) = iprot.readListBegin()
+          for _i806 in xrange(_size802):
+            _elem807 = FieldSchema()
+            _elem807.read(iprot)
+            self.cols.append(_elem807)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -20709,8 +20857,8 @@ class SchemaVersion:
     if self.cols is not None:
       oprot.writeFieldBegin('cols', TType.LIST, 4)
       oprot.writeListBegin(TType.STRUCT, len(self.cols))
-      for iter801 in self.cols:
-        iter801.write(oprot)
+      for iter808 in self.cols:
+        iter808.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.state is not None:
@@ -20965,11 +21113,11 @@ class FindSchemasByColsResp:
       if fid == 1:
         if ftype == TType.LIST:
           self.schemaVersions = []
-          (_etype805, _size802) = iprot.readListBegin()
-          for _i806 in xrange(_size802):
-            _elem807 = SchemaVersionDescriptor()
-            _elem807.read(iprot)
-            self.schemaVersions.append(_elem807)
+          (_etype812, _size809) = iprot.readListBegin()
+          for _i813 in xrange(_size809):
+            _elem814 = SchemaVersionDescriptor()
+            _elem814.read(iprot)
+            self.schemaVersions.append(_elem814)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -20986,8 +21134,8 @@ class FindSchemasByColsResp:
     if self.schemaVersions is not None:
       oprot.writeFieldBegin('schemaVersions', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.schemaVersions))
-      for iter808 in self.schemaVersions:
-        iter808.write(oprot)
+      for iter815 in self.schemaVersions:
+        iter815.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 2687ce5..21dc708 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -2572,6 +2572,37 @@ class CommitTxnRequest
   ::Thrift::Struct.generate_accessors self
 end
 
+class ReplTblWriteIdStateRequest
+  include ::Thrift::Struct, ::Thrift::Struct_Union
+  VALIDWRITEIDLIST = 1
+  USER = 2
+  HOSTNAME = 3
+  DBNAME = 4
+  TABLENAME = 5
+  PARTNAMES = 6
+
+  FIELDS = {
+    VALIDWRITEIDLIST => {:type => ::Thrift::Types::STRING, :name => 'validWriteIdlist'},
+    USER => {:type => ::Thrift::Types::STRING, :name => 'user'},
+    HOSTNAME => {:type => ::Thrift::Types::STRING, :name => 'hostName'},
+    DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
+    TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'},
+    PARTNAMES => {:type => ::Thrift::Types::LIST, :name => 'partNames', :element => {:type => ::Thrift::Types::STRING}, :optional => true}
+  }
+
+  def struct_fields; FIELDS; end
+
+  def validate
+    raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field validWriteIdlist is unset!') unless @validWriteIdlist
+    raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field user is unset!') unless @user
+    raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field hostName is unset!') unless @hostName
+    raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field dbName is unset!') unless @dbName
+    raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tableName is unset!') unless @tableName
+  end
+
+  ::Thrift::Struct.generate_accessors self
+end
+
 class GetValidWriteIdsRequest
   include ::Thrift::Struct, ::Thrift::Struct_Union
   FULLTABLENAMES = 1

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
index 4de8bd3..7946b6c 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
@@ -2437,6 +2437,20 @@ module ThriftHiveMetastore
       return
     end
 
+    def repl_tbl_writeid_state(rqst)
+      send_repl_tbl_writeid_state(rqst)
+      recv_repl_tbl_writeid_state()
+    end
+
+    def send_repl_tbl_writeid_state(rqst)
+      send_message('repl_tbl_writeid_state', Repl_tbl_writeid_state_args, :rqst => rqst)
+    end
+
+    def recv_repl_tbl_writeid_state()
+      result = receive_message(Repl_tbl_writeid_state_result)
+      return
+    end
+
     def get_valid_write_ids(rqst)
       send_get_valid_write_ids(rqst)
       return recv_get_valid_write_ids()
@@ -5273,6 +5287,13 @@ module ThriftHiveMetastore
       write_result(result, oprot, 'commit_txn', seqid)
     end
 
+    def process_repl_tbl_writeid_state(seqid, iprot, oprot)
+      args = read_args(iprot, Repl_tbl_writeid_state_args)
+      result = Repl_tbl_writeid_state_result.new()
+      @handler.repl_tbl_writeid_state(args.rqst)
+      write_result(result, oprot, 'repl_tbl_writeid_state', seqid)
+    end
+
     def process_get_valid_write_ids(seqid, iprot, oprot)
       args = read_args(iprot, Get_valid_write_ids_args)
       result = Get_valid_write_ids_result.new()
@@ -11467,6 +11488,37 @@ module ThriftHiveMetastore
     ::Thrift::Struct.generate_accessors self
   end
 
+  class Repl_tbl_writeid_state_args
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+    RQST = 1
+
+    FIELDS = {
+      RQST => {:type => ::Thrift::Types::STRUCT, :name => 'rqst', :class => ::ReplTblWriteIdStateRequest}
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
+  class Repl_tbl_writeid_state_result
+    include ::Thrift::Struct, ::Thrift::Struct_Union
+
+    FIELDS = {
+
+    }
+
+    def struct_fields; FIELDS; end
+
+    def validate
+    end
+
+    ::Thrift::Struct.generate_accessors self
+  end
+
   class Get_valid_write_ids_args
     include ::Thrift::Struct, ::Thrift::Struct_Union
     RQST = 1

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 397a081..92cd131 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -7022,6 +7022,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
     }
 
     @Override
+    public void repl_tbl_writeid_state(ReplTblWriteIdStateRequest rqst) throws TException {
+      getTxnHandler().replTableWriteIdState(rqst);
+    }
+
+    @Override
     public GetValidWriteIdsResponse get_valid_write_ids(GetValidWriteIdsRequest rqst) throws TException {
       return getTxnHandler().getValidWriteIds(rqst);
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 1138ed3..70080b1 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2487,6 +2487,33 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
   }
 
   @Override
+  public void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames)
+          throws TException {
+    String user;
+    try {
+      user = UserGroupInformation.getCurrentUser().getUserName();
+    } catch (IOException e) {
+      LOG.error("Unable to resolve current user name " + e.getMessage());
+      throw new RuntimeException(e);
+    }
+
+    String hostName;
+    try {
+      hostName = InetAddress.getLocalHost().getHostName();
+    } catch (UnknownHostException e) {
+      LOG.error("Unable to resolve my host name " + e.getMessage());
+      throw new RuntimeException(e);
+    }
+
+    ReplTblWriteIdStateRequest rqst
+            = new ReplTblWriteIdStateRequest(validWriteIdList, user, hostName, dbName, tableName);
+    if (partNames != null) {
+      rqst.setPartNames(partNames);
+    }
+    client.repl_tbl_writeid_state(rqst);
+  }
+
+  @Override
   public long allocateTableWriteId(long txnId, String dbName, String tableName) throws TException {
     return allocateTableWriteIdsBatch(Collections.singletonList(txnId), dbName, tableName).get(0).getWriteId();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 72b814d..0f302a6 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -2820,14 +2820,14 @@ public interface IMetaStoreClient {
   /**
    * Rollback a transaction.  This will also unlock any locks associated with
    * this transaction.
-   * @param txnid id of transaction to be rolled back.
+   * @param srcTxnid id of transaction at source while is rolled back and to be replicated.
    * @param replPolicy the replication policy to identify the source cluster
    * @throws NoSuchTxnException if the requested transaction does not exist.
    * Note that this can result from the transaction having timed out and been
    * deleted.
    * @throws TException
    */
-  void replRollbackTxn(long txnid, String replPolicy) throws NoSuchTxnException, TException;
+  void replRollbackTxn(long srcTxnid, String replPolicy) throws NoSuchTxnException, TException;
 
   /**
    * Commit a transaction.  This will also unlock any locks associated with
@@ -2846,7 +2846,7 @@ public interface IMetaStoreClient {
   /**
    * Commit a transaction.  This will also unlock any locks associated with
    * this transaction.
-   * @param txnid id of transaction to be committed.
+   * @param srcTxnid id of transaction at source which is committed and to be replicated.
    * @param replPolicy the replication policy to identify the source cluster
    * @throws NoSuchTxnException if the requested transaction does not exist.
    * This can result fro the transaction having timed out and been deleted by
@@ -2855,7 +2855,7 @@ public interface IMetaStoreClient {
    * aborted.  This can result from the transaction timing out.
    * @throws TException
    */
-  void replCommitTxn(long txnid, String replPolicy)
+  void replCommitTxn(long srcTxnid, String replPolicy)
           throws NoSuchTxnException, TxnAbortedException, TException;
 
   /**
@@ -2874,6 +2874,17 @@ public interface IMetaStoreClient {
   long allocateTableWriteId(long txnId, String dbName, String tableName) throws TException;
 
   /**
+   * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
+   * @param validWriteIdList Snapshot of writeid list when the table/partition is dumped.
+   * @param dbName Database name
+   * @param tableName Table which is written.
+   * @param partNames List of partitions being written.
+   * @throws TException in case of failure to replicate the writeid state
+   */
+  void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames)
+          throws TException;
+
+  /**
    * Allocate a per table write ID and associate it with the given transaction.
    * @param txnIds ids of transaction batchto which the allocated write ID to be associated.
    * @param dbName name of DB in which the table belongs.

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index 7c1d5f5..abe1226 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -64,25 +64,24 @@ public class ReplChangeManager {
   }
 
   public static class FileInfo {
-    FileSystem srcFs;
-    Path sourcePath;
-    Path cmPath;
-    String checkSum;
-    boolean useSourcePath;
-
-    public FileInfo(FileSystem srcFs, Path sourcePath) {
-      this.srcFs = srcFs;
-      this.sourcePath = sourcePath;
-      this.cmPath = null;
-      this.checkSum = null;
-      this.useSourcePath = true;
+    private FileSystem srcFs;
+    private Path sourcePath;
+    private Path cmPath;
+    private String checkSum;
+    private boolean useSourcePath;
+    private String subDir;
+
+    public FileInfo(FileSystem srcFs, Path sourcePath, String subDir) {
+      this(srcFs, sourcePath, null, null, true, subDir);
     }
-    public FileInfo(FileSystem srcFs, Path sourcePath, Path cmPath, String checkSum, boolean useSourcePath) {
+    public FileInfo(FileSystem srcFs, Path sourcePath, Path cmPath,
+                    String checkSum, boolean useSourcePath, String subDir) {
       this.srcFs = srcFs;
       this.sourcePath = sourcePath;
       this.cmPath = cmPath;
       this.checkSum = checkSum;
       this.useSourcePath = useSourcePath;
+      this.subDir = subDir;
     }
     public FileSystem getSrcFs() {
       return srcFs;
@@ -102,6 +101,9 @@ public class ReplChangeManager {
     public void setIsUseSourcePath(boolean useSourcePath) {
       this.useSourcePath = useSourcePath;
     }
+    public String getSubDir() {
+      return subDir;
+    }
     public Path getEffectivePath() {
       if (useSourcePath) {
         return sourcePath;
@@ -301,20 +303,21 @@ public class ReplChangeManager {
    * matches, return the file; otherwise, use chksumString to retrieve it from cmroot
    * @param src Original file location
    * @param checksumString Checksum of the original file
+   * @param subDir Sub directory to which the source file belongs to
    * @param conf
    * @return Corresponding FileInfo object
    */
-  public static FileInfo getFileInfo(Path src, String checksumString, Configuration conf)
+  public static FileInfo getFileInfo(Path src, String checksumString, String subDir, Configuration conf)
           throws MetaException {
     try {
       FileSystem srcFs = src.getFileSystem(conf);
       if (checksumString == null) {
-        return new FileInfo(srcFs, src);
+        return new FileInfo(srcFs, src, subDir);
       }
 
       Path cmPath = getCMPath(conf, src.getName(), checksumString);
       if (!srcFs.exists(src)) {
-        return new FileInfo(srcFs, src, cmPath, checksumString, false);
+        return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir);
       }
 
       String currentChecksumString;
@@ -322,12 +325,12 @@ public class ReplChangeManager {
         currentChecksumString = checksumFor(src, srcFs);
       } catch (IOException ex) {
         // If the file is missing or getting modified, then refer CM path
-        return new FileInfo(srcFs, src, cmPath, checksumString, false);
+        return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir);
       }
       if ((currentChecksumString == null) || checksumString.equals(currentChecksumString)) {
-        return new FileInfo(srcFs, src, cmPath, checksumString, true);
+        return new FileInfo(srcFs, src, cmPath, checksumString, true, subDir);
       } else {
-        return new FileInfo(srcFs, src, cmPath, checksumString, false);
+        return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir);
       }
     } catch (IOException e) {
       throw new MetaException(StringUtils.stringifyException(e));
@@ -335,19 +338,24 @@ public class ReplChangeManager {
   }
 
   /***
-   * Concatenate filename and checksum with "#"
+   * Concatenate filename, checksum  and subdirectory with "#"
    * @param fileUriStr Filename string
    * @param fileChecksum Checksum string
+   * @param encodedSubDir sub directory path into which this file belongs to. Here encoded means,
+   *                      the multiple levels of subdirectories are concatenated with path separator "/"
    * @return Concatenated Uri string
    */
   // TODO: this needs to be enhanced once change management based filesystem is implemented
-  // Currently using fileuri#checksum as the format
-  static public String encodeFileUri(String fileUriStr, String fileChecksum) {
+  // Currently using fileuri#checksum#subdirs as the format
+  public static String encodeFileUri(String fileUriStr, String fileChecksum, String encodedSubDir) {
+    String encodedUri = fileUriStr;
     if (fileChecksum != null) {
-      return fileUriStr + URI_FRAGMENT_SEPARATOR + fileChecksum;
-    } else {
-      return fileUriStr;
+      encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum;
+    }
+    if (encodedSubDir != null) {
+      encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + encodedSubDir;
     }
+    return encodedUri;
   }
 
   /***
@@ -357,11 +365,14 @@ public class ReplChangeManager {
    */
   static public String[] getFileWithChksumFromURI(String fileURIStr) {
     String[] uriAndFragment = fileURIStr.split(URI_FRAGMENT_SEPARATOR);
-    String[] result = new String[2];
+    String[] result = new String[3];
     result[0] = uriAndFragment[0];
     if (uriAndFragment.length>1) {
       result[1] = uriAndFragment[1];
     }
+    if (uriAndFragment.length>2) {
+      result[2] = uriAndFragment[2];
+    }
     return result;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index db596a6..c513b4d 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.classification.RetrySemantics;
@@ -104,6 +105,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
 import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
@@ -557,7 +559,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     try {
       Connection dbConn = null;
       Statement stmt = null;
-      ResultSet rs = null;
       try {
         lockInternal();
         /**
@@ -583,111 +584,124 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         if (numTxns > maxTxns) numTxns = maxTxns;
 
         stmt = dbConn.createStatement();
+        List<Long> txnIds = openTxns(dbConn, stmt, rqst);
 
-        if (rqst.isSetReplPolicy()) {
-          List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), stmt);
-          if (!targetTxnIdList.isEmpty()) {
-            if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) {
-              LOG.warn("target txn id number " + targetTxnIdList.toString() +
-                      " is not matching with source txn id number " + rqst.getReplSrcTxnIds().toString());
-            }
-            LOG.info("Target transactions " + targetTxnIdList.toString() + " are present for repl policy :" +
-              rqst.getReplPolicy() + " and Source transaction id : " + rqst.getReplSrcTxnIds().toString());
-            return new OpenTxnsResponse(targetTxnIdList);
-          }
-        }
-
-        String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID");
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          throw new MetaException("Transaction database not properly " +
-            "configured, can't find next transaction id.");
-        }
-        long first = rs.getLong(1);
-        s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
+        LOG.debug("Going to commit");
+        dbConn.commit();
+        return new OpenTxnsResponse(txnIds);
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "openTxns(" + rqst + ")");
+        throw new MetaException("Unable to select from transaction database "
+          + StringUtils.stringifyException(e));
+      } finally {
+        close(null, stmt, dbConn);
+        unlockInternal();
+      }
+    } catch (RetryException e) {
+      return openTxns(rqst);
+    }
+  }
 
-        long now = getDbTime(dbConn);
-        List<Long> txnIds = new ArrayList<>(numTxns);
+  private List<Long> openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst)
+          throws SQLException, MetaException {
+    int numTxns = rqst.getNum_txns();
+    ResultSet rs = null;
+    try {
+      if (rqst.isSetReplPolicy()) {
+        List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), stmt);
 
-        List<String> rows = new ArrayList<>();
-        for (long i = first; i < first + numTxns; i++) {
-          txnIds.add(i);
-          rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + "," + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname()));
-        }
-        List<String> queries = sqlGenerator.createInsertValuesStmt(
-          "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host)", rows);
-        for (String q : queries) {
-          LOG.debug("Going to execute update <" + q + ">");
-          stmt.execute(q);
+        if (!targetTxnIdList.isEmpty()) {
+          if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) {
+            LOG.warn("target txn id number " + targetTxnIdList.toString() +
+                    " is not matching with source txn id number " + rqst.getReplSrcTxnIds().toString());
+          }
+          LOG.info("Target transactions " + targetTxnIdList.toString() + " are present for repl policy :" +
+                  rqst.getReplPolicy() + " and Source transaction id : " + rqst.getReplSrcTxnIds().toString());
+          return targetTxnIdList;
         }
+      }
 
-        // Need to register minimum open txnid for current transactions into MIN_HISTORY table.
-        s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_OPEN);
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          throw new IllegalStateException("Scalar query returned no rows?!?!!");
-        }
+      String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID");
+      LOG.debug("Going to execute query <" + s + ">");
+      rs = stmt.executeQuery(s);
+      if (!rs.next()) {
+        throw new MetaException("Transaction database not properly " +
+                "configured, can't find next transaction id.");
+      }
+      long first = rs.getLong(1);
+      s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
+      LOG.debug("Going to execute update <" + s + ">");
+      stmt.executeUpdate(s);
 
-        // TXNS table should have atleast one entry because we just inserted the newly opened txns.
-        // So, min(txn_id) would be a non-zero txnid.
-        long minOpenTxnId = rs.getLong(1);
-        assert(minOpenTxnId > 0);
-        rows.clear();
-        for (long txnId = first; txnId < first + numTxns; txnId++) {
-          rows.add(txnId + ", " + minOpenTxnId);
-        }
+      long now = getDbTime(dbConn);
+      List<Long> txnIds = new ArrayList<>(numTxns);
 
-        // Insert transaction entries into MIN_HISTORY_LEVEL.
-        List<String> inserts = sqlGenerator.createInsertValuesStmt(
-                "MIN_HISTORY_LEVEL (mhl_txnid, mhl_min_open_txnid)", rows);
-        for (String insert : inserts) {
-          LOG.debug("Going to execute insert <" + insert + ">");
-          stmt.execute(insert);
-        }
-        LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds
-                + ") with min_open_txn: " + minOpenTxnId);
+      List<String> rows = new ArrayList<>();
+      for (long i = first; i < first + numTxns; i++) {
+        txnIds.add(i);
+        rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ","
+                + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname()));
+      }
+      List<String> queries = sqlGenerator.createInsertValuesStmt(
+              "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host)", rows);
+      for (String q : queries) {
+        LOG.debug("Going to execute update <" + q + ">");
+        stmt.execute(q);
+      }
 
-        if (rqst.isSetReplPolicy()) {
-          List<String> rowsRepl = new ArrayList<>();
+      // Need to register minimum open txnid for current transactions into MIN_HISTORY table.
+      s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_OPEN);
+      LOG.debug("Going to execute query <" + s + ">");
+      rs = stmt.executeQuery(s);
+      if (!rs.next()) {
+        throw new IllegalStateException("Scalar query returned no rows?!?!!");
+      }
 
-          for (int i = 0; i < numTxns; i++) {
-            rowsRepl.add(
-                    quoteString(rqst.getReplPolicy()) + "," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
-          }
+      // TXNS table should have atleast one entry because we just inserted the newly opened txns.
+      // So, min(txn_id) would be a non-zero txnid.
+      long minOpenTxnId = rs.getLong(1);
+      assert (minOpenTxnId > 0);
+      rows.clear();
+      for (long txnId = first; txnId < first + numTxns; txnId++) {
+        rows.add(txnId + ", " + minOpenTxnId);
+      }
 
-          List<String> queriesRepl = sqlGenerator.createInsertValuesStmt(
-                  "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl);
+      // Insert transaction entries into MIN_HISTORY_LEVEL.
+      List<String> inserts = sqlGenerator.createInsertValuesStmt(
+              "MIN_HISTORY_LEVEL (mhl_txnid, mhl_min_open_txnid)", rows);
+      for (String insert : inserts) {
+        LOG.debug("Going to execute insert <" + insert + ">");
+        stmt.execute(insert);
+      }
+      LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds
+              + ") with min_open_txn: " + minOpenTxnId);
 
-          for (String query : queriesRepl) {
-            LOG.info("Going to execute insert <" + query + ">");
-            stmt.execute(query);
-          }
+      if (rqst.isSetReplPolicy()) {
+        List<String> rowsRepl = new ArrayList<>();
+
+        for (int i = 0; i < numTxns; i++) {
+          rowsRepl.add(
+                  quoteString(rqst.getReplPolicy()) + "," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
         }
 
-        if (transactionalListeners != null) {
-          MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                  EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator);
+        List<String> queriesRepl = sqlGenerator.createInsertValuesStmt(
+                "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl);
+
+        for (String query : queriesRepl) {
+          LOG.info("Going to execute insert <" + query + ">");
+          stmt.execute(query);
         }
+      }
 
-        LOG.debug("Going to commit");
-        dbConn.commit();
-        return new OpenTxnsResponse(txnIds);
-      } catch (SQLException e) {
-        LOG.debug("Going to rollback");
-        rollbackDBConn(dbConn);
-        checkRetryable(dbConn, e, "openTxns(" + rqst + ")");
-        throw new MetaException("Unable to select from transaction database "
-          + StringUtils.stringifyException(e));
-      } finally {
-        close(rs, stmt, dbConn);
-        unlockInternal();
+      if (transactionalListeners != null) {
+        MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator);
       }
-    } catch (RetryException e) {
-      return openTxns(rqst);
+      return txnIds;
+    } finally {
+      close(rs);
     }
   }
 
@@ -1097,6 +1111,136 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
+  /**
+   * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
+   * @param rqst info on table/partitions and writeid snapshot to replicate.
+   * @throws MetaException
+   */
+  @Override
+  @RetrySemantics.Idempotent("No-op if already replicated the writeid state")
+  public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException {
+    String dbName = rqst.getDbName().toLowerCase();
+    String tblName = rqst.getTableName().toLowerCase();
+    ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(rqst.getValidWriteIdlist());
+
+    // Get the abortedWriteIds which are already sorted in ascending order.
+    List<Long> abortedWriteIds = getAbortedWriteIds(validWriteIdList);
+    int numAbortedWrites = abortedWriteIds.size();
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      ResultSet rs = null;
+      TxnStore.MutexAPI.LockHandle handle = null;
+      try {
+        lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+
+        // Clean the txn to writeid map/TXN_COMPONENTS for the given table as we bootstrap here
+        String sql = "delete from TXN_TO_WRITE_ID where t2w_database = " + quoteString(dbName)
+                + " and t2w_table = " + quoteString(tblName);
+        LOG.debug("Going to execute delete <" + sql + ">");
+        stmt.executeUpdate(sql);
+
+        if (numAbortedWrites > 0) {
+          // Allocate/Map one txn per aborted writeId and abort the txn to mark writeid as aborted.
+          List<Long> txnIds = openTxns(dbConn, stmt,
+                  new OpenTxnRequest(numAbortedWrites, rqst.getUser(), rqst.getHostName()));
+          assert(numAbortedWrites == txnIds.size());
+
+          // Map each aborted write id with each allocated txn.
+          List<String> rows = new ArrayList<>();
+          int i = 0;
+          for (long txn : txnIds) {
+            long writeId = abortedWriteIds.get(i++);
+            rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId);
+            LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
+          }
+
+          // Insert entries to TXN_TO_WRITE_ID for aborted write ids
+          List<String> inserts = sqlGenerator.createInsertValuesStmt(
+                  "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);
+          for (String insert : inserts) {
+            LOG.debug("Going to execute insert <" + insert + ">");
+            stmt.execute(insert);
+          }
+
+          // Abort all the allocated txns so that the mapped write ids are referred as aborted ones.
+          int numAborts = abortTxns(dbConn, txnIds, true);
+          assert(numAborts == numAbortedWrites);
+        }
+        handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
+
+        // There are some txns in the list which has no write id allocated and hence go ahead and do it.
+        // Get the next write id for the given table and update it with new next write id.
+        // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID
+        sql = sqlGenerator.addForUpdateClause(
+                "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
+                        + " and nwi_table = " + quoteString(tblName));
+        LOG.debug("Going to execute query <" + sql + ">");
+
+        long nextWriteId = validWriteIdList.getHighWatermark() + 1;
+        rs = stmt.executeQuery(sql);
+        if (!rs.next()) {
+          // First allocation of write id (hwm+1) should add the table to the next_write_id meta table.
+          sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
+                  + quoteString(dbName) + "," + quoteString(tblName) + ","
+                  + Long.toString(nextWriteId) + ")";
+          LOG.debug("Going to execute insert <" + sql + ">");
+          stmt.execute(sql);
+        } else {
+          // Update the NEXT_WRITE_ID for the given table with hwm+1 from source
+          sql = "update NEXT_WRITE_ID set nwi_next = " + (nextWriteId)
+                  + " where nwi_database = " + quoteString(dbName)
+                  + " and nwi_table = " + quoteString(tblName);
+          LOG.debug("Going to execute update <" + sql + ">");
+          stmt.executeUpdate(sql);
+        }
+
+        LOG.debug("Going to commit");
+        dbConn.commit();
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")");
+        throw new MetaException("Unable to update transaction database "
+                + StringUtils.stringifyException(e));
+      } finally {
+        close(rs, stmt, dbConn);
+        if(handle != null) {
+          handle.releaseLocks();
+        }
+        unlockInternal();
+      }
+    } catch (RetryException e) {
+      replTableWriteIdState(rqst);
+    }
+
+    // Schedule Major compaction on all the partitions/table to clean aborted data
+    if (numAbortedWrites > 0) {
+      CompactionRequest compactRqst = new CompactionRequest(rqst.getDbName(), rqst.getTableName(),
+              CompactionType.MAJOR);
+      if (rqst.isSetPartNames()) {
+        for (String partName : rqst.getPartNames()) {
+          compactRqst.setPartitionname(partName);
+          compact(compactRqst);
+        }
+      } else {
+        compact(compactRqst);
+      }
+    }
+  }
+
+  private List<Long> getAbortedWriteIds(ValidWriteIdList validWriteIdList) {
+    List<Long> abortedWriteIds = new ArrayList<>();
+    for (long writeId : validWriteIdList.getInvalidWriteIds()) {
+      if (validWriteIdList.isWriteIdAborted(writeId)) {
+        abortedWriteIds.add(writeId);
+      }
+    }
+    return abortedWriteIds;
+  }
+
   @Override
   @RetrySemantics.ReadOnly
   public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst)
@@ -1336,7 +1480,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           // The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here
           writeId = 1;
           s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
-                  + quoteString(dbName) + "," + quoteString(tblName) + "," + String.valueOf(numOfWriteIds + 1) + ")";
+                  + quoteString(dbName) + "," + quoteString(tblName) + "," + Long.toString(numOfWriteIds + 1) + ")";
           LOG.debug("Going to execute insert <" + s + ">");
           stmt.execute(s);
         } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 6d8b845..b8e398f 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.metastore.api.*;
@@ -117,13 +116,21 @@ public interface TxnStore extends Configurable {
     throws NoSuchTxnException, TxnAbortedException,  MetaException;
 
   /**
+   * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
+   * @param rqst info on table/partitions and writeid snapshot to replicate.
+   * @throws MetaException in case of failure
+   */
+  @RetrySemantics.Idempotent
+  void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException;
+
+  /**
    * Get the first transaction corresponding to given database and table after transactions
    * referenced in the transaction snapshot.
    * @return
    * @throws MetaException
    */
   @RetrySemantics.Idempotent
-  public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit(
+  BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit(
       String inputDbName, String inputTableName, ValidWriteIdList txnList)
           throws MetaException;
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 1880d44..fa291d5 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
@@ -56,24 +57,46 @@ public class TxnUtils {
    * @return a valid txn list.
    */
   public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
-    /*todo: should highWater be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0
+    /*
+     * The highWaterMark should be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0
      * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which
-     * doesn't make sense for Snapshot Isolation.  Of course for Read Committed, the list should
-     * inlude the latest committed set.*/
-    long highWater = txns.getTxn_high_water_mark();
-    List<Long> open = txns.getOpen_txns();
-    BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
-    long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
+     * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should
+     * include the latest committed set.
+     */
+    long highWaterMark = (currentTxn > 0) ? Math.min(currentTxn, txns.getTxn_high_water_mark())
+                                          : txns.getTxn_high_water_mark();
+
+    // Open txns are already sorted in ascending order. This list may or may not include HWM
+    // but it is guaranteed that list won't have txn > HWM. But, if we overwrite the HWM with currentTxn
+    // then need to truncate the exceptions list accordingly.
+    List<Long> openTxns = txns.getOpen_txns();
+
+    // We care only about open/aborted txns below currentTxn and hence the size should be determined
+    // for the exceptions list. The currentTxn will be missing in openTxns list only in rare case like
+    // txn is aborted by AcidHouseKeeperService and compactor actually cleans up the aborted txns.
+    // So, for such cases, we get negative value for sizeToHwm with found position for currentTxn, and so,
+    // we just negate it to get the size.
+    int sizeToHwm = (currentTxn > 0) ? Collections.binarySearch(openTxns, currentTxn) : openTxns.size();
+    sizeToHwm = (sizeToHwm < 0) ? (-sizeToHwm) : sizeToHwm;
+    long[] exceptions = new long[sizeToHwm];
+    BitSet inAbortedBits = BitSet.valueOf(txns.getAbortedBits());
+    BitSet outAbortedBits = new BitSet();
+    long minOpenTxnId = Long.MAX_VALUE;
     int i = 0;
-    for (long txn : open) {
-      if (currentTxn > 0 && currentTxn == txn) continue;
+    for (long txn : openTxns) {
+      // For snapshot isolation, we don't care about txns greater than current txn and so stop here.
+      // Also, we need not include current txn to exceptions list.
+      if ((currentTxn > 0) && (txn >= currentTxn)) {
+        break;
+      }
+      if (inAbortedBits.get(i)) {
+        outAbortedBits.set(i);
+      } else if (minOpenTxnId == Long.MAX_VALUE) {
+        minOpenTxnId = txn;
+      }
       exceptions[i++] = txn;
     }
-    if (txns.isSetMin_open_txn()) {
-      return new ValidReadTxnList(exceptions, abortedBits, highWater, txns.getMin_open_txn());
-    } else {
-      return new ValidReadTxnList(exceptions, abortedBits, highWater);
-    }
+    return new ValidReadTxnList(exceptions, outAbortedBits, highWaterMark, minOpenTxnId);
   }
 
   /**