You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dj...@apache.org on 2018/05/08 18:42:07 UTC
[02/58] [abbrv] hive git commit: HIVE-18988: Support bootstrap
replication of ACID tables (Sankar Hariappan, reviewed by Mahesh Kumar Behera,
Thejas M Nair)
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index b1e577a..accb94f 100644
--- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -11532,6 +11532,154 @@ class CommitTxnRequest:
def __ne__(self, other):
return not (self == other)
+class ReplTblWriteIdStateRequest:
+ """
+ Attributes:
+ - validWriteIdlist
+ - user
+ - hostName
+ - dbName
+ - tableName
+ - partNames
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'validWriteIdlist', None, None, ), # 1
+ (2, TType.STRING, 'user', None, None, ), # 2
+ (3, TType.STRING, 'hostName', None, None, ), # 3
+ (4, TType.STRING, 'dbName', None, None, ), # 4
+ (5, TType.STRING, 'tableName', None, None, ), # 5
+ (6, TType.LIST, 'partNames', (TType.STRING,None), None, ), # 6
+ )
+
+ def __init__(self, validWriteIdlist=None, user=None, hostName=None, dbName=None, tableName=None, partNames=None,):
+ self.validWriteIdlist = validWriteIdlist
+ self.user = user
+ self.hostName = hostName
+ self.dbName = dbName
+ self.tableName = tableName
+ self.partNames = partNames
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.validWriteIdlist = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.user = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRING:
+ self.hostName = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.STRING:
+ self.dbName = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 5:
+ if ftype == TType.STRING:
+ self.tableName = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.LIST:
+ self.partNames = []
+ (_etype526, _size523) = iprot.readListBegin()
+ for _i527 in xrange(_size523):
+ _elem528 = iprot.readString()
+ self.partNames.append(_elem528)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('ReplTblWriteIdStateRequest')
+ if self.validWriteIdlist is not None:
+ oprot.writeFieldBegin('validWriteIdlist', TType.STRING, 1)
+ oprot.writeString(self.validWriteIdlist)
+ oprot.writeFieldEnd()
+ if self.user is not None:
+ oprot.writeFieldBegin('user', TType.STRING, 2)
+ oprot.writeString(self.user)
+ oprot.writeFieldEnd()
+ if self.hostName is not None:
+ oprot.writeFieldBegin('hostName', TType.STRING, 3)
+ oprot.writeString(self.hostName)
+ oprot.writeFieldEnd()
+ if self.dbName is not None:
+ oprot.writeFieldBegin('dbName', TType.STRING, 4)
+ oprot.writeString(self.dbName)
+ oprot.writeFieldEnd()
+ if self.tableName is not None:
+ oprot.writeFieldBegin('tableName', TType.STRING, 5)
+ oprot.writeString(self.tableName)
+ oprot.writeFieldEnd()
+ if self.partNames is not None:
+ oprot.writeFieldBegin('partNames', TType.LIST, 6)
+ oprot.writeListBegin(TType.STRING, len(self.partNames))
+ for iter529 in self.partNames:
+ oprot.writeString(iter529)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.validWriteIdlist is None:
+ raise TProtocol.TProtocolException(message='Required field validWriteIdlist is unset!')
+ if self.user is None:
+ raise TProtocol.TProtocolException(message='Required field user is unset!')
+ if self.hostName is None:
+ raise TProtocol.TProtocolException(message='Required field hostName is unset!')
+ if self.dbName is None:
+ raise TProtocol.TProtocolException(message='Required field dbName is unset!')
+ if self.tableName is None:
+ raise TProtocol.TProtocolException(message='Required field tableName is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.validWriteIdlist)
+ value = (value * 31) ^ hash(self.user)
+ value = (value * 31) ^ hash(self.hostName)
+ value = (value * 31) ^ hash(self.dbName)
+ value = (value * 31) ^ hash(self.tableName)
+ value = (value * 31) ^ hash(self.partNames)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class GetValidWriteIdsRequest:
"""
Attributes:
@@ -11561,10 +11709,10 @@ class GetValidWriteIdsRequest:
if fid == 1:
if ftype == TType.LIST:
self.fullTableNames = []
- (_etype526, _size523) = iprot.readListBegin()
- for _i527 in xrange(_size523):
- _elem528 = iprot.readString()
- self.fullTableNames.append(_elem528)
+ (_etype533, _size530) = iprot.readListBegin()
+ for _i534 in xrange(_size530):
+ _elem535 = iprot.readString()
+ self.fullTableNames.append(_elem535)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -11586,8 +11734,8 @@ class GetValidWriteIdsRequest:
if self.fullTableNames is not None:
oprot.writeFieldBegin('fullTableNames', TType.LIST, 1)
oprot.writeListBegin(TType.STRING, len(self.fullTableNames))
- for iter529 in self.fullTableNames:
- oprot.writeString(iter529)
+ for iter536 in self.fullTableNames:
+ oprot.writeString(iter536)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.validTxnList is not None:
@@ -11670,10 +11818,10 @@ class TableValidWriteIds:
elif fid == 3:
if ftype == TType.LIST:
self.invalidWriteIds = []
- (_etype533, _size530) = iprot.readListBegin()
- for _i534 in xrange(_size530):
- _elem535 = iprot.readI64()
- self.invalidWriteIds.append(_elem535)
+ (_etype540, _size537) = iprot.readListBegin()
+ for _i541 in xrange(_size537):
+ _elem542 = iprot.readI64()
+ self.invalidWriteIds.append(_elem542)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -11708,8 +11856,8 @@ class TableValidWriteIds:
if self.invalidWriteIds is not None:
oprot.writeFieldBegin('invalidWriteIds', TType.LIST, 3)
oprot.writeListBegin(TType.I64, len(self.invalidWriteIds))
- for iter536 in self.invalidWriteIds:
- oprot.writeI64(iter536)
+ for iter543 in self.invalidWriteIds:
+ oprot.writeI64(iter543)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.minOpenWriteId is not None:
@@ -11781,11 +11929,11 @@ class GetValidWriteIdsResponse:
if fid == 1:
if ftype == TType.LIST:
self.tblValidWriteIds = []
- (_etype540, _size537) = iprot.readListBegin()
- for _i541 in xrange(_size537):
- _elem542 = TableValidWriteIds()
- _elem542.read(iprot)
- self.tblValidWriteIds.append(_elem542)
+ (_etype547, _size544) = iprot.readListBegin()
+ for _i548 in xrange(_size544):
+ _elem549 = TableValidWriteIds()
+ _elem549.read(iprot)
+ self.tblValidWriteIds.append(_elem549)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -11802,8 +11950,8 @@ class GetValidWriteIdsResponse:
if self.tblValidWriteIds is not None:
oprot.writeFieldBegin('tblValidWriteIds', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.tblValidWriteIds))
- for iter543 in self.tblValidWriteIds:
- iter543.write(oprot)
+ for iter550 in self.tblValidWriteIds:
+ iter550.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -11879,10 +12027,10 @@ class AllocateTableWriteIdsRequest:
elif fid == 3:
if ftype == TType.LIST:
self.txnIds = []
- (_etype547, _size544) = iprot.readListBegin()
- for _i548 in xrange(_size544):
- _elem549 = iprot.readI64()
- self.txnIds.append(_elem549)
+ (_etype554, _size551) = iprot.readListBegin()
+ for _i555 in xrange(_size551):
+ _elem556 = iprot.readI64()
+ self.txnIds.append(_elem556)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -11894,11 +12042,11 @@ class AllocateTableWriteIdsRequest:
elif fid == 5:
if ftype == TType.LIST:
self.srcTxnToWriteIdList = []
- (_etype553, _size550) = iprot.readListBegin()
- for _i554 in xrange(_size550):
- _elem555 = TxnToWriteId()
- _elem555.read(iprot)
- self.srcTxnToWriteIdList.append(_elem555)
+ (_etype560, _size557) = iprot.readListBegin()
+ for _i561 in xrange(_size557):
+ _elem562 = TxnToWriteId()
+ _elem562.read(iprot)
+ self.srcTxnToWriteIdList.append(_elem562)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -11923,8 +12071,8 @@ class AllocateTableWriteIdsRequest:
if self.txnIds is not None:
oprot.writeFieldBegin('txnIds', TType.LIST, 3)
oprot.writeListBegin(TType.I64, len(self.txnIds))
- for iter556 in self.txnIds:
- oprot.writeI64(iter556)
+ for iter563 in self.txnIds:
+ oprot.writeI64(iter563)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.replPolicy is not None:
@@ -11934,8 +12082,8 @@ class AllocateTableWriteIdsRequest:
if self.srcTxnToWriteIdList is not None:
oprot.writeFieldBegin('srcTxnToWriteIdList', TType.LIST, 5)
oprot.writeListBegin(TType.STRUCT, len(self.srcTxnToWriteIdList))
- for iter557 in self.srcTxnToWriteIdList:
- iter557.write(oprot)
+ for iter564 in self.srcTxnToWriteIdList:
+ iter564.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -12077,11 +12225,11 @@ class AllocateTableWriteIdsResponse:
if fid == 1:
if ftype == TType.LIST:
self.txnToWriteIds = []
- (_etype561, _size558) = iprot.readListBegin()
- for _i562 in xrange(_size558):
- _elem563 = TxnToWriteId()
- _elem563.read(iprot)
- self.txnToWriteIds.append(_elem563)
+ (_etype568, _size565) = iprot.readListBegin()
+ for _i569 in xrange(_size565):
+ _elem570 = TxnToWriteId()
+ _elem570.read(iprot)
+ self.txnToWriteIds.append(_elem570)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -12098,8 +12246,8 @@ class AllocateTableWriteIdsResponse:
if self.txnToWriteIds is not None:
oprot.writeFieldBegin('txnToWriteIds', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.txnToWriteIds))
- for iter564 in self.txnToWriteIds:
- iter564.write(oprot)
+ for iter571 in self.txnToWriteIds:
+ iter571.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -12327,11 +12475,11 @@ class LockRequest:
if fid == 1:
if ftype == TType.LIST:
self.component = []
- (_etype568, _size565) = iprot.readListBegin()
- for _i569 in xrange(_size565):
- _elem570 = LockComponent()
- _elem570.read(iprot)
- self.component.append(_elem570)
+ (_etype575, _size572) = iprot.readListBegin()
+ for _i576 in xrange(_size572):
+ _elem577 = LockComponent()
+ _elem577.read(iprot)
+ self.component.append(_elem577)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -12368,8 +12516,8 @@ class LockRequest:
if self.component is not None:
oprot.writeFieldBegin('component', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.component))
- for iter571 in self.component:
- iter571.write(oprot)
+ for iter578 in self.component:
+ iter578.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.txnid is not None:
@@ -13067,11 +13215,11 @@ class ShowLocksResponse:
if fid == 1:
if ftype == TType.LIST:
self.locks = []
- (_etype575, _size572) = iprot.readListBegin()
- for _i576 in xrange(_size572):
- _elem577 = ShowLocksResponseElement()
- _elem577.read(iprot)
- self.locks.append(_elem577)
+ (_etype582, _size579) = iprot.readListBegin()
+ for _i583 in xrange(_size579):
+ _elem584 = ShowLocksResponseElement()
+ _elem584.read(iprot)
+ self.locks.append(_elem584)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -13088,8 +13236,8 @@ class ShowLocksResponse:
if self.locks is not None:
oprot.writeFieldBegin('locks', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.locks))
- for iter578 in self.locks:
- iter578.write(oprot)
+ for iter585 in self.locks:
+ iter585.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -13304,20 +13452,20 @@ class HeartbeatTxnRangeResponse:
if fid == 1:
if ftype == TType.SET:
self.aborted = set()
- (_etype582, _size579) = iprot.readSetBegin()
- for _i583 in xrange(_size579):
- _elem584 = iprot.readI64()
- self.aborted.add(_elem584)
+ (_etype589, _size586) = iprot.readSetBegin()
+ for _i590 in xrange(_size586):
+ _elem591 = iprot.readI64()
+ self.aborted.add(_elem591)
iprot.readSetEnd()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.SET:
self.nosuch = set()
- (_etype588, _size585) = iprot.readSetBegin()
- for _i589 in xrange(_size585):
- _elem590 = iprot.readI64()
- self.nosuch.add(_elem590)
+ (_etype595, _size592) = iprot.readSetBegin()
+ for _i596 in xrange(_size592):
+ _elem597 = iprot.readI64()
+ self.nosuch.add(_elem597)
iprot.readSetEnd()
else:
iprot.skip(ftype)
@@ -13334,15 +13482,15 @@ class HeartbeatTxnRangeResponse:
if self.aborted is not None:
oprot.writeFieldBegin('aborted', TType.SET, 1)
oprot.writeSetBegin(TType.I64, len(self.aborted))
- for iter591 in self.aborted:
- oprot.writeI64(iter591)
+ for iter598 in self.aborted:
+ oprot.writeI64(iter598)
oprot.writeSetEnd()
oprot.writeFieldEnd()
if self.nosuch is not None:
oprot.writeFieldBegin('nosuch', TType.SET, 2)
oprot.writeSetBegin(TType.I64, len(self.nosuch))
- for iter592 in self.nosuch:
- oprot.writeI64(iter592)
+ for iter599 in self.nosuch:
+ oprot.writeI64(iter599)
oprot.writeSetEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -13439,11 +13587,11 @@ class CompactionRequest:
elif fid == 6:
if ftype == TType.MAP:
self.properties = {}
- (_ktype594, _vtype595, _size593 ) = iprot.readMapBegin()
- for _i597 in xrange(_size593):
- _key598 = iprot.readString()
- _val599 = iprot.readString()
- self.properties[_key598] = _val599
+ (_ktype601, _vtype602, _size600 ) = iprot.readMapBegin()
+ for _i604 in xrange(_size600):
+ _key605 = iprot.readString()
+ _val606 = iprot.readString()
+ self.properties[_key605] = _val606
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -13480,9 +13628,9 @@ class CompactionRequest:
if self.properties is not None:
oprot.writeFieldBegin('properties', TType.MAP, 6)
oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.properties))
- for kiter600,viter601 in self.properties.items():
- oprot.writeString(kiter600)
- oprot.writeString(viter601)
+ for kiter607,viter608 in self.properties.items():
+ oprot.writeString(kiter607)
+ oprot.writeString(viter608)
oprot.writeMapEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -13917,11 +14065,11 @@ class ShowCompactResponse:
if fid == 1:
if ftype == TType.LIST:
self.compacts = []
- (_etype605, _size602) = iprot.readListBegin()
- for _i606 in xrange(_size602):
- _elem607 = ShowCompactResponseElement()
- _elem607.read(iprot)
- self.compacts.append(_elem607)
+ (_etype612, _size609) = iprot.readListBegin()
+ for _i613 in xrange(_size609):
+ _elem614 = ShowCompactResponseElement()
+ _elem614.read(iprot)
+ self.compacts.append(_elem614)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -13938,8 +14086,8 @@ class ShowCompactResponse:
if self.compacts is not None:
oprot.writeFieldBegin('compacts', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.compacts))
- for iter608 in self.compacts:
- iter608.write(oprot)
+ for iter615 in self.compacts:
+ iter615.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -14028,10 +14176,10 @@ class AddDynamicPartitions:
elif fid == 5:
if ftype == TType.LIST:
self.partitionnames = []
- (_etype612, _size609) = iprot.readListBegin()
- for _i613 in xrange(_size609):
- _elem614 = iprot.readString()
- self.partitionnames.append(_elem614)
+ (_etype619, _size616) = iprot.readListBegin()
+ for _i620 in xrange(_size616):
+ _elem621 = iprot.readString()
+ self.partitionnames.append(_elem621)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -14069,8 +14217,8 @@ class AddDynamicPartitions:
if self.partitionnames is not None:
oprot.writeFieldBegin('partitionnames', TType.LIST, 5)
oprot.writeListBegin(TType.STRING, len(self.partitionnames))
- for iter615 in self.partitionnames:
- oprot.writeString(iter615)
+ for iter622 in self.partitionnames:
+ oprot.writeString(iter622)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.operationType is not None:
@@ -14300,10 +14448,10 @@ class CreationMetadata:
elif fid == 4:
if ftype == TType.SET:
self.tablesUsed = set()
- (_etype619, _size616) = iprot.readSetBegin()
- for _i620 in xrange(_size616):
- _elem621 = iprot.readString()
- self.tablesUsed.add(_elem621)
+ (_etype626, _size623) = iprot.readSetBegin()
+ for _i627 in xrange(_size623):
+ _elem628 = iprot.readString()
+ self.tablesUsed.add(_elem628)
iprot.readSetEnd()
else:
iprot.skip(ftype)
@@ -14337,8 +14485,8 @@ class CreationMetadata:
if self.tablesUsed is not None:
oprot.writeFieldBegin('tablesUsed', TType.SET, 4)
oprot.writeSetBegin(TType.STRING, len(self.tablesUsed))
- for iter622 in self.tablesUsed:
- oprot.writeString(iter622)
+ for iter629 in self.tablesUsed:
+ oprot.writeString(iter629)
oprot.writeSetEnd()
oprot.writeFieldEnd()
if self.validTxnList is not None:
@@ -14650,11 +14798,11 @@ class NotificationEventResponse:
if fid == 1:
if ftype == TType.LIST:
self.events = []
- (_etype626, _size623) = iprot.readListBegin()
- for _i627 in xrange(_size623):
- _elem628 = NotificationEvent()
- _elem628.read(iprot)
- self.events.append(_elem628)
+ (_etype633, _size630) = iprot.readListBegin()
+ for _i634 in xrange(_size630):
+ _elem635 = NotificationEvent()
+ _elem635.read(iprot)
+ self.events.append(_elem635)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -14671,8 +14819,8 @@ class NotificationEventResponse:
if self.events is not None:
oprot.writeFieldBegin('events', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.events))
- for iter629 in self.events:
- iter629.write(oprot)
+ for iter636 in self.events:
+ iter636.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -14966,20 +15114,20 @@ class InsertEventRequestData:
elif fid == 2:
if ftype == TType.LIST:
self.filesAdded = []
- (_etype633, _size630) = iprot.readListBegin()
- for _i634 in xrange(_size630):
- _elem635 = iprot.readString()
- self.filesAdded.append(_elem635)
+ (_etype640, _size637) = iprot.readListBegin()
+ for _i641 in xrange(_size637):
+ _elem642 = iprot.readString()
+ self.filesAdded.append(_elem642)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.LIST:
self.filesAddedChecksum = []
- (_etype639, _size636) = iprot.readListBegin()
- for _i640 in xrange(_size636):
- _elem641 = iprot.readString()
- self.filesAddedChecksum.append(_elem641)
+ (_etype646, _size643) = iprot.readListBegin()
+ for _i647 in xrange(_size643):
+ _elem648 = iprot.readString()
+ self.filesAddedChecksum.append(_elem648)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -15000,15 +15148,15 @@ class InsertEventRequestData:
if self.filesAdded is not None:
oprot.writeFieldBegin('filesAdded', TType.LIST, 2)
oprot.writeListBegin(TType.STRING, len(self.filesAdded))
- for iter642 in self.filesAdded:
- oprot.writeString(iter642)
+ for iter649 in self.filesAdded:
+ oprot.writeString(iter649)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.filesAddedChecksum is not None:
oprot.writeFieldBegin('filesAddedChecksum', TType.LIST, 3)
oprot.writeListBegin(TType.STRING, len(self.filesAddedChecksum))
- for iter643 in self.filesAddedChecksum:
- oprot.writeString(iter643)
+ for iter650 in self.filesAddedChecksum:
+ oprot.writeString(iter650)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -15166,10 +15314,10 @@ class FireEventRequest:
elif fid == 5:
if ftype == TType.LIST:
self.partitionVals = []
- (_etype647, _size644) = iprot.readListBegin()
- for _i648 in xrange(_size644):
- _elem649 = iprot.readString()
- self.partitionVals.append(_elem649)
+ (_etype654, _size651) = iprot.readListBegin()
+ for _i655 in xrange(_size651):
+ _elem656 = iprot.readString()
+ self.partitionVals.append(_elem656)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -15207,8 +15355,8 @@ class FireEventRequest:
if self.partitionVals is not None:
oprot.writeFieldBegin('partitionVals', TType.LIST, 5)
oprot.writeListBegin(TType.STRING, len(self.partitionVals))
- for iter650 in self.partitionVals:
- oprot.writeString(iter650)
+ for iter657 in self.partitionVals:
+ oprot.writeString(iter657)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.catName is not None:
@@ -15400,12 +15548,12 @@ class GetFileMetadataByExprResult:
if fid == 1:
if ftype == TType.MAP:
self.metadata = {}
- (_ktype652, _vtype653, _size651 ) = iprot.readMapBegin()
- for _i655 in xrange(_size651):
- _key656 = iprot.readI64()
- _val657 = MetadataPpdResult()
- _val657.read(iprot)
- self.metadata[_key656] = _val657
+ (_ktype659, _vtype660, _size658 ) = iprot.readMapBegin()
+ for _i662 in xrange(_size658):
+ _key663 = iprot.readI64()
+ _val664 = MetadataPpdResult()
+ _val664.read(iprot)
+ self.metadata[_key663] = _val664
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -15427,9 +15575,9 @@ class GetFileMetadataByExprResult:
if self.metadata is not None:
oprot.writeFieldBegin('metadata', TType.MAP, 1)
oprot.writeMapBegin(TType.I64, TType.STRUCT, len(self.metadata))
- for kiter658,viter659 in self.metadata.items():
- oprot.writeI64(kiter658)
- viter659.write(oprot)
+ for kiter665,viter666 in self.metadata.items():
+ oprot.writeI64(kiter665)
+ viter666.write(oprot)
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.isSupported is not None:
@@ -15499,10 +15647,10 @@ class GetFileMetadataByExprRequest:
if fid == 1:
if ftype == TType.LIST:
self.fileIds = []
- (_etype663, _size660) = iprot.readListBegin()
- for _i664 in xrange(_size660):
- _elem665 = iprot.readI64()
- self.fileIds.append(_elem665)
+ (_etype670, _size667) = iprot.readListBegin()
+ for _i671 in xrange(_size667):
+ _elem672 = iprot.readI64()
+ self.fileIds.append(_elem672)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -15534,8 +15682,8 @@ class GetFileMetadataByExprRequest:
if self.fileIds is not None:
oprot.writeFieldBegin('fileIds', TType.LIST, 1)
oprot.writeListBegin(TType.I64, len(self.fileIds))
- for iter666 in self.fileIds:
- oprot.writeI64(iter666)
+ for iter673 in self.fileIds:
+ oprot.writeI64(iter673)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.expr is not None:
@@ -15609,11 +15757,11 @@ class GetFileMetadataResult:
if fid == 1:
if ftype == TType.MAP:
self.metadata = {}
- (_ktype668, _vtype669, _size667 ) = iprot.readMapBegin()
- for _i671 in xrange(_size667):
- _key672 = iprot.readI64()
- _val673 = iprot.readString()
- self.metadata[_key672] = _val673
+ (_ktype675, _vtype676, _size674 ) = iprot.readMapBegin()
+ for _i678 in xrange(_size674):
+ _key679 = iprot.readI64()
+ _val680 = iprot.readString()
+ self.metadata[_key679] = _val680
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -15635,9 +15783,9 @@ class GetFileMetadataResult:
if self.metadata is not None:
oprot.writeFieldBegin('metadata', TType.MAP, 1)
oprot.writeMapBegin(TType.I64, TType.STRING, len(self.metadata))
- for kiter674,viter675 in self.metadata.items():
- oprot.writeI64(kiter674)
- oprot.writeString(viter675)
+ for kiter681,viter682 in self.metadata.items():
+ oprot.writeI64(kiter681)
+ oprot.writeString(viter682)
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.isSupported is not None:
@@ -15698,10 +15846,10 @@ class GetFileMetadataRequest:
if fid == 1:
if ftype == TType.LIST:
self.fileIds = []
- (_etype679, _size676) = iprot.readListBegin()
- for _i680 in xrange(_size676):
- _elem681 = iprot.readI64()
- self.fileIds.append(_elem681)
+ (_etype686, _size683) = iprot.readListBegin()
+ for _i687 in xrange(_size683):
+ _elem688 = iprot.readI64()
+ self.fileIds.append(_elem688)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -15718,8 +15866,8 @@ class GetFileMetadataRequest:
if self.fileIds is not None:
oprot.writeFieldBegin('fileIds', TType.LIST, 1)
oprot.writeListBegin(TType.I64, len(self.fileIds))
- for iter682 in self.fileIds:
- oprot.writeI64(iter682)
+ for iter689 in self.fileIds:
+ oprot.writeI64(iter689)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -15825,20 +15973,20 @@ class PutFileMetadataRequest:
if fid == 1:
if ftype == TType.LIST:
self.fileIds = []
- (_etype686, _size683) = iprot.readListBegin()
- for _i687 in xrange(_size683):
- _elem688 = iprot.readI64()
- self.fileIds.append(_elem688)
+ (_etype693, _size690) = iprot.readListBegin()
+ for _i694 in xrange(_size690):
+ _elem695 = iprot.readI64()
+ self.fileIds.append(_elem695)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.LIST:
self.metadata = []
- (_etype692, _size689) = iprot.readListBegin()
- for _i693 in xrange(_size689):
- _elem694 = iprot.readString()
- self.metadata.append(_elem694)
+ (_etype699, _size696) = iprot.readListBegin()
+ for _i700 in xrange(_size696):
+ _elem701 = iprot.readString()
+ self.metadata.append(_elem701)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -15860,15 +16008,15 @@ class PutFileMetadataRequest:
if self.fileIds is not None:
oprot.writeFieldBegin('fileIds', TType.LIST, 1)
oprot.writeListBegin(TType.I64, len(self.fileIds))
- for iter695 in self.fileIds:
- oprot.writeI64(iter695)
+ for iter702 in self.fileIds:
+ oprot.writeI64(iter702)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.metadata is not None:
oprot.writeFieldBegin('metadata', TType.LIST, 2)
oprot.writeListBegin(TType.STRING, len(self.metadata))
- for iter696 in self.metadata:
- oprot.writeString(iter696)
+ for iter703 in self.metadata:
+ oprot.writeString(iter703)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.type is not None:
@@ -15976,10 +16124,10 @@ class ClearFileMetadataRequest:
if fid == 1:
if ftype == TType.LIST:
self.fileIds = []
- (_etype700, _size697) = iprot.readListBegin()
- for _i701 in xrange(_size697):
- _elem702 = iprot.readI64()
- self.fileIds.append(_elem702)
+ (_etype707, _size704) = iprot.readListBegin()
+ for _i708 in xrange(_size704):
+ _elem709 = iprot.readI64()
+ self.fileIds.append(_elem709)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -15996,8 +16144,8 @@ class ClearFileMetadataRequest:
if self.fileIds is not None:
oprot.writeFieldBegin('fileIds', TType.LIST, 1)
oprot.writeListBegin(TType.I64, len(self.fileIds))
- for iter703 in self.fileIds:
- oprot.writeI64(iter703)
+ for iter710 in self.fileIds:
+ oprot.writeI64(iter710)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -16226,11 +16374,11 @@ class GetAllFunctionsResponse:
if fid == 1:
if ftype == TType.LIST:
self.functions = []
- (_etype707, _size704) = iprot.readListBegin()
- for _i708 in xrange(_size704):
- _elem709 = Function()
- _elem709.read(iprot)
- self.functions.append(_elem709)
+ (_etype714, _size711) = iprot.readListBegin()
+ for _i715 in xrange(_size711):
+ _elem716 = Function()
+ _elem716.read(iprot)
+ self.functions.append(_elem716)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -16247,8 +16395,8 @@ class GetAllFunctionsResponse:
if self.functions is not None:
oprot.writeFieldBegin('functions', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.functions))
- for iter710 in self.functions:
- iter710.write(oprot)
+ for iter717 in self.functions:
+ iter717.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -16300,10 +16448,10 @@ class ClientCapabilities:
if fid == 1:
if ftype == TType.LIST:
self.values = []
- (_etype714, _size711) = iprot.readListBegin()
- for _i715 in xrange(_size711):
- _elem716 = iprot.readI32()
- self.values.append(_elem716)
+ (_etype721, _size718) = iprot.readListBegin()
+ for _i722 in xrange(_size718):
+ _elem723 = iprot.readI32()
+ self.values.append(_elem723)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -16320,8 +16468,8 @@ class ClientCapabilities:
if self.values is not None:
oprot.writeFieldBegin('values', TType.LIST, 1)
oprot.writeListBegin(TType.I32, len(self.values))
- for iter717 in self.values:
- oprot.writeI32(iter717)
+ for iter724 in self.values:
+ oprot.writeI32(iter724)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -16566,10 +16714,10 @@ class GetTablesRequest:
elif fid == 2:
if ftype == TType.LIST:
self.tblNames = []
- (_etype721, _size718) = iprot.readListBegin()
- for _i722 in xrange(_size718):
- _elem723 = iprot.readString()
- self.tblNames.append(_elem723)
+ (_etype728, _size725) = iprot.readListBegin()
+ for _i729 in xrange(_size725):
+ _elem730 = iprot.readString()
+ self.tblNames.append(_elem730)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -16601,8 +16749,8 @@ class GetTablesRequest:
if self.tblNames is not None:
oprot.writeFieldBegin('tblNames', TType.LIST, 2)
oprot.writeListBegin(TType.STRING, len(self.tblNames))
- for iter724 in self.tblNames:
- oprot.writeString(iter724)
+ for iter731 in self.tblNames:
+ oprot.writeString(iter731)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.capabilities is not None:
@@ -16667,11 +16815,11 @@ class GetTablesResult:
if fid == 1:
if ftype == TType.LIST:
self.tables = []
- (_etype728, _size725) = iprot.readListBegin()
- for _i729 in xrange(_size725):
- _elem730 = Table()
- _elem730.read(iprot)
- self.tables.append(_elem730)
+ (_etype735, _size732) = iprot.readListBegin()
+ for _i736 in xrange(_size732):
+ _elem737 = Table()
+ _elem737.read(iprot)
+ self.tables.append(_elem737)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -16688,8 +16836,8 @@ class GetTablesResult:
if self.tables is not None:
oprot.writeFieldBegin('tables', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.tables))
- for iter731 in self.tables:
- iter731.write(oprot)
+ for iter738 in self.tables:
+ iter738.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -17003,10 +17151,10 @@ class Materialization:
if fid == 1:
if ftype == TType.SET:
self.tablesUsed = set()
- (_etype735, _size732) = iprot.readSetBegin()
- for _i736 in xrange(_size732):
- _elem737 = iprot.readString()
- self.tablesUsed.add(_elem737)
+ (_etype742, _size739) = iprot.readSetBegin()
+ for _i743 in xrange(_size739):
+ _elem744 = iprot.readString()
+ self.tablesUsed.add(_elem744)
iprot.readSetEnd()
else:
iprot.skip(ftype)
@@ -17038,8 +17186,8 @@ class Materialization:
if self.tablesUsed is not None:
oprot.writeFieldBegin('tablesUsed', TType.SET, 1)
oprot.writeSetBegin(TType.STRING, len(self.tablesUsed))
- for iter738 in self.tablesUsed:
- oprot.writeString(iter738)
+ for iter745 in self.tablesUsed:
+ oprot.writeString(iter745)
oprot.writeSetEnd()
oprot.writeFieldEnd()
if self.validTxnList is not None:
@@ -17944,44 +18092,44 @@ class WMFullResourcePlan:
elif fid == 2:
if ftype == TType.LIST:
self.pools = []
- (_etype742, _size739) = iprot.readListBegin()
- for _i743 in xrange(_size739):
- _elem744 = WMPool()
- _elem744.read(iprot)
- self.pools.append(_elem744)
+ (_etype749, _size746) = iprot.readListBegin()
+ for _i750 in xrange(_size746):
+ _elem751 = WMPool()
+ _elem751.read(iprot)
+ self.pools.append(_elem751)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.LIST:
self.mappings = []
- (_etype748, _size745) = iprot.readListBegin()
- for _i749 in xrange(_size745):
- _elem750 = WMMapping()
- _elem750.read(iprot)
- self.mappings.append(_elem750)
+ (_etype755, _size752) = iprot.readListBegin()
+ for _i756 in xrange(_size752):
+ _elem757 = WMMapping()
+ _elem757.read(iprot)
+ self.mappings.append(_elem757)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.LIST:
self.triggers = []
- (_etype754, _size751) = iprot.readListBegin()
- for _i755 in xrange(_size751):
- _elem756 = WMTrigger()
- _elem756.read(iprot)
- self.triggers.append(_elem756)
+ (_etype761, _size758) = iprot.readListBegin()
+ for _i762 in xrange(_size758):
+ _elem763 = WMTrigger()
+ _elem763.read(iprot)
+ self.triggers.append(_elem763)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 5:
if ftype == TType.LIST:
self.poolTriggers = []
- (_etype760, _size757) = iprot.readListBegin()
- for _i761 in xrange(_size757):
- _elem762 = WMPoolTrigger()
- _elem762.read(iprot)
- self.poolTriggers.append(_elem762)
+ (_etype767, _size764) = iprot.readListBegin()
+ for _i768 in xrange(_size764):
+ _elem769 = WMPoolTrigger()
+ _elem769.read(iprot)
+ self.poolTriggers.append(_elem769)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -18002,29 +18150,29 @@ class WMFullResourcePlan:
if self.pools is not None:
oprot.writeFieldBegin('pools', TType.LIST, 2)
oprot.writeListBegin(TType.STRUCT, len(self.pools))
- for iter763 in self.pools:
- iter763.write(oprot)
+ for iter770 in self.pools:
+ iter770.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.mappings is not None:
oprot.writeFieldBegin('mappings', TType.LIST, 3)
oprot.writeListBegin(TType.STRUCT, len(self.mappings))
- for iter764 in self.mappings:
- iter764.write(oprot)
+ for iter771 in self.mappings:
+ iter771.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.triggers is not None:
oprot.writeFieldBegin('triggers', TType.LIST, 4)
oprot.writeListBegin(TType.STRUCT, len(self.triggers))
- for iter765 in self.triggers:
- iter765.write(oprot)
+ for iter772 in self.triggers:
+ iter772.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.poolTriggers is not None:
oprot.writeFieldBegin('poolTriggers', TType.LIST, 5)
oprot.writeListBegin(TType.STRUCT, len(self.poolTriggers))
- for iter766 in self.poolTriggers:
- iter766.write(oprot)
+ for iter773 in self.poolTriggers:
+ iter773.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -18498,11 +18646,11 @@ class WMGetAllResourcePlanResponse:
if fid == 1:
if ftype == TType.LIST:
self.resourcePlans = []
- (_etype770, _size767) = iprot.readListBegin()
- for _i771 in xrange(_size767):
- _elem772 = WMResourcePlan()
- _elem772.read(iprot)
- self.resourcePlans.append(_elem772)
+ (_etype777, _size774) = iprot.readListBegin()
+ for _i778 in xrange(_size774):
+ _elem779 = WMResourcePlan()
+ _elem779.read(iprot)
+ self.resourcePlans.append(_elem779)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -18519,8 +18667,8 @@ class WMGetAllResourcePlanResponse:
if self.resourcePlans is not None:
oprot.writeFieldBegin('resourcePlans', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.resourcePlans))
- for iter773 in self.resourcePlans:
- iter773.write(oprot)
+ for iter780 in self.resourcePlans:
+ iter780.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -18824,20 +18972,20 @@ class WMValidateResourcePlanResponse:
if fid == 1:
if ftype == TType.LIST:
self.errors = []
- (_etype777, _size774) = iprot.readListBegin()
- for _i778 in xrange(_size774):
- _elem779 = iprot.readString()
- self.errors.append(_elem779)
+ (_etype784, _size781) = iprot.readListBegin()
+ for _i785 in xrange(_size781):
+ _elem786 = iprot.readString()
+ self.errors.append(_elem786)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.LIST:
self.warnings = []
- (_etype783, _size780) = iprot.readListBegin()
- for _i784 in xrange(_size780):
- _elem785 = iprot.readString()
- self.warnings.append(_elem785)
+ (_etype790, _size787) = iprot.readListBegin()
+ for _i791 in xrange(_size787):
+ _elem792 = iprot.readString()
+ self.warnings.append(_elem792)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -18854,15 +19002,15 @@ class WMValidateResourcePlanResponse:
if self.errors is not None:
oprot.writeFieldBegin('errors', TType.LIST, 1)
oprot.writeListBegin(TType.STRING, len(self.errors))
- for iter786 in self.errors:
- oprot.writeString(iter786)
+ for iter793 in self.errors:
+ oprot.writeString(iter793)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.warnings is not None:
oprot.writeFieldBegin('warnings', TType.LIST, 2)
oprot.writeListBegin(TType.STRING, len(self.warnings))
- for iter787 in self.warnings:
- oprot.writeString(iter787)
+ for iter794 in self.warnings:
+ oprot.writeString(iter794)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -19439,11 +19587,11 @@ class WMGetTriggersForResourePlanResponse:
if fid == 1:
if ftype == TType.LIST:
self.triggers = []
- (_etype791, _size788) = iprot.readListBegin()
- for _i792 in xrange(_size788):
- _elem793 = WMTrigger()
- _elem793.read(iprot)
- self.triggers.append(_elem793)
+ (_etype798, _size795) = iprot.readListBegin()
+ for _i799 in xrange(_size795):
+ _elem800 = WMTrigger()
+ _elem800.read(iprot)
+ self.triggers.append(_elem800)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -19460,8 +19608,8 @@ class WMGetTriggersForResourePlanResponse:
if self.triggers is not None:
oprot.writeFieldBegin('triggers', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.triggers))
- for iter794 in self.triggers:
- iter794.write(oprot)
+ for iter801 in self.triggers:
+ iter801.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -20645,11 +20793,11 @@ class SchemaVersion:
elif fid == 4:
if ftype == TType.LIST:
self.cols = []
- (_etype798, _size795) = iprot.readListBegin()
- for _i799 in xrange(_size795):
- _elem800 = FieldSchema()
- _elem800.read(iprot)
- self.cols.append(_elem800)
+ (_etype805, _size802) = iprot.readListBegin()
+ for _i806 in xrange(_size802):
+ _elem807 = FieldSchema()
+ _elem807.read(iprot)
+ self.cols.append(_elem807)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -20709,8 +20857,8 @@ class SchemaVersion:
if self.cols is not None:
oprot.writeFieldBegin('cols', TType.LIST, 4)
oprot.writeListBegin(TType.STRUCT, len(self.cols))
- for iter801 in self.cols:
- iter801.write(oprot)
+ for iter808 in self.cols:
+ iter808.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.state is not None:
@@ -20965,11 +21113,11 @@ class FindSchemasByColsResp:
if fid == 1:
if ftype == TType.LIST:
self.schemaVersions = []
- (_etype805, _size802) = iprot.readListBegin()
- for _i806 in xrange(_size802):
- _elem807 = SchemaVersionDescriptor()
- _elem807.read(iprot)
- self.schemaVersions.append(_elem807)
+ (_etype812, _size809) = iprot.readListBegin()
+ for _i813 in xrange(_size809):
+ _elem814 = SchemaVersionDescriptor()
+ _elem814.read(iprot)
+ self.schemaVersions.append(_elem814)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -20986,8 +21134,8 @@ class FindSchemasByColsResp:
if self.schemaVersions is not None:
oprot.writeFieldBegin('schemaVersions', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.schemaVersions))
- for iter808 in self.schemaVersions:
- iter808.write(oprot)
+ for iter815 in self.schemaVersions:
+ iter815.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 2687ce5..21dc708 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -2572,6 +2572,37 @@ class CommitTxnRequest
::Thrift::Struct.generate_accessors self
end
+class ReplTblWriteIdStateRequest
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ VALIDWRITEIDLIST = 1
+ USER = 2
+ HOSTNAME = 3
+ DBNAME = 4
+ TABLENAME = 5
+ PARTNAMES = 6
+
+ FIELDS = {
+ VALIDWRITEIDLIST => {:type => ::Thrift::Types::STRING, :name => 'validWriteIdlist'},
+ USER => {:type => ::Thrift::Types::STRING, :name => 'user'},
+ HOSTNAME => {:type => ::Thrift::Types::STRING, :name => 'hostName'},
+ DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
+ TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'},
+ PARTNAMES => {:type => ::Thrift::Types::LIST, :name => 'partNames', :element => {:type => ::Thrift::Types::STRING}, :optional => true}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field validWriteIdlist is unset!') unless @validWriteIdlist
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field user is unset!') unless @user
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field hostName is unset!') unless @hostName
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field dbName is unset!') unless @dbName
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tableName is unset!') unless @tableName
+ end
+
+ ::Thrift::Struct.generate_accessors self
+end
+
class GetValidWriteIdsRequest
include ::Thrift::Struct, ::Thrift::Struct_Union
FULLTABLENAMES = 1
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
index 4de8bd3..7946b6c 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
@@ -2437,6 +2437,20 @@ module ThriftHiveMetastore
return
end
+ def repl_tbl_writeid_state(rqst)
+ send_repl_tbl_writeid_state(rqst)
+ recv_repl_tbl_writeid_state()
+ end
+
+ def send_repl_tbl_writeid_state(rqst)
+ send_message('repl_tbl_writeid_state', Repl_tbl_writeid_state_args, :rqst => rqst)
+ end
+
+ def recv_repl_tbl_writeid_state()
+ result = receive_message(Repl_tbl_writeid_state_result)
+ return
+ end
+
def get_valid_write_ids(rqst)
send_get_valid_write_ids(rqst)
return recv_get_valid_write_ids()
@@ -5273,6 +5287,13 @@ module ThriftHiveMetastore
write_result(result, oprot, 'commit_txn', seqid)
end
+ def process_repl_tbl_writeid_state(seqid, iprot, oprot)
+ args = read_args(iprot, Repl_tbl_writeid_state_args)
+ result = Repl_tbl_writeid_state_result.new()
+ @handler.repl_tbl_writeid_state(args.rqst)
+ write_result(result, oprot, 'repl_tbl_writeid_state', seqid)
+ end
+
def process_get_valid_write_ids(seqid, iprot, oprot)
args = read_args(iprot, Get_valid_write_ids_args)
result = Get_valid_write_ids_result.new()
@@ -11467,6 +11488,37 @@ module ThriftHiveMetastore
::Thrift::Struct.generate_accessors self
end
+ class Repl_tbl_writeid_state_args
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ RQST = 1
+
+ FIELDS = {
+ RQST => {:type => ::Thrift::Types::STRUCT, :name => 'rqst', :class => ::ReplTblWriteIdStateRequest}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
+ class Repl_tbl_writeid_state_result
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+
+ FIELDS = {
+
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
class Get_valid_write_ids_args
include ::Thrift::Struct, ::Thrift::Struct_Union
RQST = 1
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 397a081..92cd131 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -7022,6 +7022,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
@Override
+ public void repl_tbl_writeid_state(ReplTblWriteIdStateRequest rqst) throws TException {
+ getTxnHandler().replTableWriteIdState(rqst);
+ }
+
+ @Override
public GetValidWriteIdsResponse get_valid_write_ids(GetValidWriteIdsRequest rqst) throws TException {
return getTxnHandler().getValidWriteIds(rqst);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 1138ed3..70080b1 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2487,6 +2487,33 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
}
@Override
+ public void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames)
+ throws TException {
+ String user;
+ try {
+ user = UserGroupInformation.getCurrentUser().getUserName();
+ } catch (IOException e) {
+ LOG.error("Unable to resolve current user name " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+
+ String hostName;
+ try {
+ hostName = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ LOG.error("Unable to resolve my host name " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+
+ ReplTblWriteIdStateRequest rqst
+ = new ReplTblWriteIdStateRequest(validWriteIdList, user, hostName, dbName, tableName);
+ if (partNames != null) {
+ rqst.setPartNames(partNames);
+ }
+ client.repl_tbl_writeid_state(rqst);
+ }
+
+ @Override
public long allocateTableWriteId(long txnId, String dbName, String tableName) throws TException {
return allocateTableWriteIdsBatch(Collections.singletonList(txnId), dbName, tableName).get(0).getWriteId();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 72b814d..0f302a6 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -2820,14 +2820,14 @@ public interface IMetaStoreClient {
/**
* Rollback a transaction. This will also unlock any locks associated with
* this transaction.
- * @param txnid id of transaction to be rolled back.
+ * @param srcTxnid id of transaction at source while is rolled back and to be replicated.
* @param replPolicy the replication policy to identify the source cluster
* @throws NoSuchTxnException if the requested transaction does not exist.
* Note that this can result from the transaction having timed out and been
* deleted.
* @throws TException
*/
- void replRollbackTxn(long txnid, String replPolicy) throws NoSuchTxnException, TException;
+ void replRollbackTxn(long srcTxnid, String replPolicy) throws NoSuchTxnException, TException;
/**
* Commit a transaction. This will also unlock any locks associated with
@@ -2846,7 +2846,7 @@ public interface IMetaStoreClient {
/**
* Commit a transaction. This will also unlock any locks associated with
* this transaction.
- * @param txnid id of transaction to be committed.
+ * @param srcTxnid id of transaction at source which is committed and to be replicated.
* @param replPolicy the replication policy to identify the source cluster
* @throws NoSuchTxnException if the requested transaction does not exist.
* This can result fro the transaction having timed out and been deleted by
@@ -2855,7 +2855,7 @@ public interface IMetaStoreClient {
* aborted. This can result from the transaction timing out.
* @throws TException
*/
- void replCommitTxn(long txnid, String replPolicy)
+ void replCommitTxn(long srcTxnid, String replPolicy)
throws NoSuchTxnException, TxnAbortedException, TException;
/**
@@ -2874,6 +2874,17 @@ public interface IMetaStoreClient {
long allocateTableWriteId(long txnId, String dbName, String tableName) throws TException;
/**
+ * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
+ * @param validWriteIdList Snapshot of writeid list when the table/partition is dumped.
+ * @param dbName Database name
+ * @param tableName Table which is written.
+ * @param partNames List of partitions being written.
+ * @throws TException in case of failure to replicate the writeid state
+ */
+ void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames)
+ throws TException;
+
+ /**
* Allocate a per table write ID and associate it with the given transaction.
* @param txnIds ids of transaction batchto which the allocated write ID to be associated.
* @param dbName name of DB in which the table belongs.
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index 7c1d5f5..abe1226 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -64,25 +64,24 @@ public class ReplChangeManager {
}
public static class FileInfo {
- FileSystem srcFs;
- Path sourcePath;
- Path cmPath;
- String checkSum;
- boolean useSourcePath;
-
- public FileInfo(FileSystem srcFs, Path sourcePath) {
- this.srcFs = srcFs;
- this.sourcePath = sourcePath;
- this.cmPath = null;
- this.checkSum = null;
- this.useSourcePath = true;
+ private FileSystem srcFs;
+ private Path sourcePath;
+ private Path cmPath;
+ private String checkSum;
+ private boolean useSourcePath;
+ private String subDir;
+
+ public FileInfo(FileSystem srcFs, Path sourcePath, String subDir) {
+ this(srcFs, sourcePath, null, null, true, subDir);
}
- public FileInfo(FileSystem srcFs, Path sourcePath, Path cmPath, String checkSum, boolean useSourcePath) {
+ public FileInfo(FileSystem srcFs, Path sourcePath, Path cmPath,
+ String checkSum, boolean useSourcePath, String subDir) {
this.srcFs = srcFs;
this.sourcePath = sourcePath;
this.cmPath = cmPath;
this.checkSum = checkSum;
this.useSourcePath = useSourcePath;
+ this.subDir = subDir;
}
public FileSystem getSrcFs() {
return srcFs;
@@ -102,6 +101,9 @@ public class ReplChangeManager {
public void setIsUseSourcePath(boolean useSourcePath) {
this.useSourcePath = useSourcePath;
}
+ public String getSubDir() {
+ return subDir;
+ }
public Path getEffectivePath() {
if (useSourcePath) {
return sourcePath;
@@ -301,20 +303,21 @@ public class ReplChangeManager {
* matches, return the file; otherwise, use chksumString to retrieve it from cmroot
* @param src Original file location
* @param checksumString Checksum of the original file
+ * @param subDir Sub directory to which the source file belongs to
* @param conf
* @return Corresponding FileInfo object
*/
- public static FileInfo getFileInfo(Path src, String checksumString, Configuration conf)
+ public static FileInfo getFileInfo(Path src, String checksumString, String subDir, Configuration conf)
throws MetaException {
try {
FileSystem srcFs = src.getFileSystem(conf);
if (checksumString == null) {
- return new FileInfo(srcFs, src);
+ return new FileInfo(srcFs, src, subDir);
}
Path cmPath = getCMPath(conf, src.getName(), checksumString);
if (!srcFs.exists(src)) {
- return new FileInfo(srcFs, src, cmPath, checksumString, false);
+ return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir);
}
String currentChecksumString;
@@ -322,12 +325,12 @@ public class ReplChangeManager {
currentChecksumString = checksumFor(src, srcFs);
} catch (IOException ex) {
// If the file is missing or getting modified, then refer CM path
- return new FileInfo(srcFs, src, cmPath, checksumString, false);
+ return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir);
}
if ((currentChecksumString == null) || checksumString.equals(currentChecksumString)) {
- return new FileInfo(srcFs, src, cmPath, checksumString, true);
+ return new FileInfo(srcFs, src, cmPath, checksumString, true, subDir);
} else {
- return new FileInfo(srcFs, src, cmPath, checksumString, false);
+ return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir);
}
} catch (IOException e) {
throw new MetaException(StringUtils.stringifyException(e));
@@ -335,19 +338,24 @@ public class ReplChangeManager {
}
/***
- * Concatenate filename and checksum with "#"
+ * Concatenate filename, checksum and subdirectory with "#"
* @param fileUriStr Filename string
* @param fileChecksum Checksum string
+ * @param encodedSubDir sub directory path into which this file belongs to. Here encoded means,
+ * the multiple levels of subdirectories are concatenated with path separator "/"
* @return Concatenated Uri string
*/
// TODO: this needs to be enhanced once change management based filesystem is implemented
- // Currently using fileuri#checksum as the format
- static public String encodeFileUri(String fileUriStr, String fileChecksum) {
+ // Currently using fileuri#checksum#subdirs as the format
+ public static String encodeFileUri(String fileUriStr, String fileChecksum, String encodedSubDir) {
+ String encodedUri = fileUriStr;
if (fileChecksum != null) {
- return fileUriStr + URI_FRAGMENT_SEPARATOR + fileChecksum;
- } else {
- return fileUriStr;
+ encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum;
+ }
+ if (encodedSubDir != null) {
+ encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + encodedSubDir;
}
+ return encodedUri;
}
/***
@@ -357,11 +365,14 @@ public class ReplChangeManager {
*/
static public String[] getFileWithChksumFromURI(String fileURIStr) {
String[] uriAndFragment = fileURIStr.split(URI_FRAGMENT_SEPARATOR);
- String[] result = new String[2];
+ String[] result = new String[3];
result[0] = uriAndFragment[0];
if (uriAndFragment.length>1) {
result[1] = uriAndFragment[1];
}
+ if (uriAndFragment.length>2) {
+ result[2] = uriAndFragment[2];
+ }
return result;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index db596a6..c513b4d 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.classification.RetrySemantics;
@@ -104,6 +105,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
@@ -557,7 +559,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
try {
Connection dbConn = null;
Statement stmt = null;
- ResultSet rs = null;
try {
lockInternal();
/**
@@ -583,111 +584,124 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (numTxns > maxTxns) numTxns = maxTxns;
stmt = dbConn.createStatement();
+ List<Long> txnIds = openTxns(dbConn, stmt, rqst);
- if (rqst.isSetReplPolicy()) {
- List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), stmt);
- if (!targetTxnIdList.isEmpty()) {
- if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) {
- LOG.warn("target txn id number " + targetTxnIdList.toString() +
- " is not matching with source txn id number " + rqst.getReplSrcTxnIds().toString());
- }
- LOG.info("Target transactions " + targetTxnIdList.toString() + " are present for repl policy :" +
- rqst.getReplPolicy() + " and Source transaction id : " + rqst.getReplSrcTxnIds().toString());
- return new OpenTxnsResponse(targetTxnIdList);
- }
- }
-
- String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID");
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
- if (!rs.next()) {
- throw new MetaException("Transaction database not properly " +
- "configured, can't find next transaction id.");
- }
- long first = rs.getLong(1);
- s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ return new OpenTxnsResponse(txnIds);
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "openTxns(" + rqst + ")");
+ throw new MetaException("Unable to select from transaction database "
+ + StringUtils.stringifyException(e));
+ } finally {
+ close(null, stmt, dbConn);
+ unlockInternal();
+ }
+ } catch (RetryException e) {
+ return openTxns(rqst);
+ }
+ }
- long now = getDbTime(dbConn);
- List<Long> txnIds = new ArrayList<>(numTxns);
+ private List<Long> openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst)
+ throws SQLException, MetaException {
+ int numTxns = rqst.getNum_txns();
+ ResultSet rs = null;
+ try {
+ if (rqst.isSetReplPolicy()) {
+ List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), stmt);
- List<String> rows = new ArrayList<>();
- for (long i = first; i < first + numTxns; i++) {
- txnIds.add(i);
- rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + "," + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname()));
- }
- List<String> queries = sqlGenerator.createInsertValuesStmt(
- "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host)", rows);
- for (String q : queries) {
- LOG.debug("Going to execute update <" + q + ">");
- stmt.execute(q);
+ if (!targetTxnIdList.isEmpty()) {
+ if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) {
+ LOG.warn("target txn id number " + targetTxnIdList.toString() +
+ " is not matching with source txn id number " + rqst.getReplSrcTxnIds().toString());
+ }
+ LOG.info("Target transactions " + targetTxnIdList.toString() + " are present for repl policy :" +
+ rqst.getReplPolicy() + " and Source transaction id : " + rqst.getReplSrcTxnIds().toString());
+ return targetTxnIdList;
}
+ }
- // Need to register minimum open txnid for current transactions into MIN_HISTORY table.
- s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_OPEN);
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
- if (!rs.next()) {
- throw new IllegalStateException("Scalar query returned no rows?!?!!");
- }
+ String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID");
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ throw new MetaException("Transaction database not properly " +
+ "configured, can't find next transaction id.");
+ }
+ long first = rs.getLong(1);
+ s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns);
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
- // TXNS table should have atleast one entry because we just inserted the newly opened txns.
- // So, min(txn_id) would be a non-zero txnid.
- long minOpenTxnId = rs.getLong(1);
- assert(minOpenTxnId > 0);
- rows.clear();
- for (long txnId = first; txnId < first + numTxns; txnId++) {
- rows.add(txnId + ", " + minOpenTxnId);
- }
+ long now = getDbTime(dbConn);
+ List<Long> txnIds = new ArrayList<>(numTxns);
- // Insert transaction entries into MIN_HISTORY_LEVEL.
- List<String> inserts = sqlGenerator.createInsertValuesStmt(
- "MIN_HISTORY_LEVEL (mhl_txnid, mhl_min_open_txnid)", rows);
- for (String insert : inserts) {
- LOG.debug("Going to execute insert <" + insert + ">");
- stmt.execute(insert);
- }
- LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds
- + ") with min_open_txn: " + minOpenTxnId);
+ List<String> rows = new ArrayList<>();
+ for (long i = first; i < first + numTxns; i++) {
+ txnIds.add(i);
+ rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ","
+ + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname()));
+ }
+ List<String> queries = sqlGenerator.createInsertValuesStmt(
+ "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host)", rows);
+ for (String q : queries) {
+ LOG.debug("Going to execute update <" + q + ">");
+ stmt.execute(q);
+ }
- if (rqst.isSetReplPolicy()) {
- List<String> rowsRepl = new ArrayList<>();
+ // Need to register minimum open txnid for current transactions into MIN_HISTORY table.
+ s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_OPEN);
+ LOG.debug("Going to execute query <" + s + ">");
+ rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ throw new IllegalStateException("Scalar query returned no rows?!?!!");
+ }
- for (int i = 0; i < numTxns; i++) {
- rowsRepl.add(
- quoteString(rqst.getReplPolicy()) + "," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
- }
+ // TXNS table should have atleast one entry because we just inserted the newly opened txns.
+ // So, min(txn_id) would be a non-zero txnid.
+ long minOpenTxnId = rs.getLong(1);
+ assert (minOpenTxnId > 0);
+ rows.clear();
+ for (long txnId = first; txnId < first + numTxns; txnId++) {
+ rows.add(txnId + ", " + minOpenTxnId);
+ }
- List<String> queriesRepl = sqlGenerator.createInsertValuesStmt(
- "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl);
+ // Insert transaction entries into MIN_HISTORY_LEVEL.
+ List<String> inserts = sqlGenerator.createInsertValuesStmt(
+ "MIN_HISTORY_LEVEL (mhl_txnid, mhl_min_open_txnid)", rows);
+ for (String insert : inserts) {
+ LOG.debug("Going to execute insert <" + insert + ">");
+ stmt.execute(insert);
+ }
+ LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds
+ + ") with min_open_txn: " + minOpenTxnId);
- for (String query : queriesRepl) {
- LOG.info("Going to execute insert <" + query + ">");
- stmt.execute(query);
- }
+ if (rqst.isSetReplPolicy()) {
+ List<String> rowsRepl = new ArrayList<>();
+
+ for (int i = 0; i < numTxns; i++) {
+ rowsRepl.add(
+ quoteString(rqst.getReplPolicy()) + "," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
}
- if (transactionalListeners != null) {
- MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
- EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator);
+ List<String> queriesRepl = sqlGenerator.createInsertValuesStmt(
+ "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl);
+
+ for (String query : queriesRepl) {
+ LOG.info("Going to execute insert <" + query + ">");
+ stmt.execute(query);
}
+ }
- LOG.debug("Going to commit");
- dbConn.commit();
- return new OpenTxnsResponse(txnIds);
- } catch (SQLException e) {
- LOG.debug("Going to rollback");
- rollbackDBConn(dbConn);
- checkRetryable(dbConn, e, "openTxns(" + rqst + ")");
- throw new MetaException("Unable to select from transaction database "
- + StringUtils.stringifyException(e));
- } finally {
- close(rs, stmt, dbConn);
- unlockInternal();
+ if (transactionalListeners != null) {
+ MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+ EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator);
}
- } catch (RetryException e) {
- return openTxns(rqst);
+ return txnIds;
+ } finally {
+ close(rs);
}
}
@@ -1097,6 +1111,136 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
+ /**
+ * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
+ * @param rqst info on table/partitions and writeid snapshot to replicate.
+ * @throws MetaException
+ */
+ @Override
+ @RetrySemantics.Idempotent("No-op if already replicated the writeid state")
+ public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException {
+ String dbName = rqst.getDbName().toLowerCase();
+ String tblName = rqst.getTableName().toLowerCase();
+ ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(rqst.getValidWriteIdlist());
+
+ // Get the abortedWriteIds which are already sorted in ascending order.
+ List<Long> abortedWriteIds = getAbortedWriteIds(validWriteIdList);
+ int numAbortedWrites = abortedWriteIds.size();
+ try {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ TxnStore.MutexAPI.LockHandle handle = null;
+ try {
+ lockInternal();
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+
+ // Clean the txn to writeid map/TXN_COMPONENTS for the given table as we bootstrap here
+ String sql = "delete from TXN_TO_WRITE_ID where t2w_database = " + quoteString(dbName)
+ + " and t2w_table = " + quoteString(tblName);
+ LOG.debug("Going to execute delete <" + sql + ">");
+ stmt.executeUpdate(sql);
+
+ if (numAbortedWrites > 0) {
+ // Allocate/Map one txn per aborted writeId and abort the txn to mark writeid as aborted.
+ List<Long> txnIds = openTxns(dbConn, stmt,
+ new OpenTxnRequest(numAbortedWrites, rqst.getUser(), rqst.getHostName()));
+ assert(numAbortedWrites == txnIds.size());
+
+ // Map each aborted write id with each allocated txn.
+ List<String> rows = new ArrayList<>();
+ int i = 0;
+ for (long txn : txnIds) {
+ long writeId = abortedWriteIds.get(i++);
+ rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId);
+ LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
+ }
+
+ // Insert entries to TXN_TO_WRITE_ID for aborted write ids
+ List<String> inserts = sqlGenerator.createInsertValuesStmt(
+ "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);
+ for (String insert : inserts) {
+ LOG.debug("Going to execute insert <" + insert + ">");
+ stmt.execute(insert);
+ }
+
+ // Abort all the allocated txns so that the mapped write ids are referred as aborted ones.
+ int numAborts = abortTxns(dbConn, txnIds, true);
+ assert(numAborts == numAbortedWrites);
+ }
+ handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
+
+ // There are some txns in the list which has no write id allocated and hence go ahead and do it.
+ // Get the next write id for the given table and update it with new next write id.
+ // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID
+ sql = sqlGenerator.addForUpdateClause(
+ "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
+ + " and nwi_table = " + quoteString(tblName));
+ LOG.debug("Going to execute query <" + sql + ">");
+
+ long nextWriteId = validWriteIdList.getHighWatermark() + 1;
+ rs = stmt.executeQuery(sql);
+ if (!rs.next()) {
+ // First allocation of write id (hwm+1) should add the table to the next_write_id meta table.
+ sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
+ + quoteString(dbName) + "," + quoteString(tblName) + ","
+ + Long.toString(nextWriteId) + ")";
+ LOG.debug("Going to execute insert <" + sql + ">");
+ stmt.execute(sql);
+ } else {
+ // Update the NEXT_WRITE_ID for the given table with hwm+1 from source
+ sql = "update NEXT_WRITE_ID set nwi_next = " + (nextWriteId)
+ + " where nwi_database = " + quoteString(dbName)
+ + " and nwi_table = " + quoteString(tblName);
+ LOG.debug("Going to execute update <" + sql + ">");
+ stmt.executeUpdate(sql);
+ }
+
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")");
+ throw new MetaException("Unable to update transaction database "
+ + StringUtils.stringifyException(e));
+ } finally {
+ close(rs, stmt, dbConn);
+ if(handle != null) {
+ handle.releaseLocks();
+ }
+ unlockInternal();
+ }
+ } catch (RetryException e) {
+ replTableWriteIdState(rqst);
+ }
+
+ // Schedule Major compaction on all the partitions/table to clean aborted data
+ if (numAbortedWrites > 0) {
+ CompactionRequest compactRqst = new CompactionRequest(rqst.getDbName(), rqst.getTableName(),
+ CompactionType.MAJOR);
+ if (rqst.isSetPartNames()) {
+ for (String partName : rqst.getPartNames()) {
+ compactRqst.setPartitionname(partName);
+ compact(compactRqst);
+ }
+ } else {
+ compact(compactRqst);
+ }
+ }
+ }
+
+ private List<Long> getAbortedWriteIds(ValidWriteIdList validWriteIdList) {
+ List<Long> abortedWriteIds = new ArrayList<>();
+ for (long writeId : validWriteIdList.getInvalidWriteIds()) {
+ if (validWriteIdList.isWriteIdAborted(writeId)) {
+ abortedWriteIds.add(writeId);
+ }
+ }
+ return abortedWriteIds;
+ }
+
@Override
@RetrySemantics.ReadOnly
public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst)
@@ -1336,7 +1480,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here
writeId = 1;
s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
- + quoteString(dbName) + "," + quoteString(tblName) + "," + String.valueOf(numOfWriteIds + 1) + ")";
+ + quoteString(dbName) + "," + quoteString(tblName) + "," + Long.toString(numOfWriteIds + 1) + ")";
LOG.debug("Going to execute insert <" + s + ">");
stmt.execute(s);
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 6d8b845..b8e398f 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.common.classification.RetrySemantics;
import org.apache.hadoop.hive.metastore.api.*;
@@ -117,13 +116,21 @@ public interface TxnStore extends Configurable {
throws NoSuchTxnException, TxnAbortedException, MetaException;
/**
+ * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
+ * @param rqst info on table/partitions and writeid snapshot to replicate.
+ * @throws MetaException in case of failure
+ */
+ @RetrySemantics.Idempotent
+ void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException;
+
+ /**
* Get the first transaction corresponding to given database and table after transactions
* referenced in the transaction snapshot.
* @return
* @throws MetaException
*/
@RetrySemantics.Idempotent
- public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit(
+ BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit(
String inputDbName, String inputTableName, ValidWriteIdList txnList)
throws MetaException;
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 1880d44..fa291d5 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -56,24 +57,46 @@ public class TxnUtils {
* @return a valid txn list.
*/
public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) {
- /*todo: should highWater be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0
+ /*
+ * The highWaterMark should be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0
* otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which
- * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should
- * inlude the latest committed set.*/
- long highWater = txns.getTxn_high_water_mark();
- List<Long> open = txns.getOpen_txns();
- BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
- long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)];
+ * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should
+ * include the latest committed set.
+ */
+ long highWaterMark = (currentTxn > 0) ? Math.min(currentTxn, txns.getTxn_high_water_mark())
+ : txns.getTxn_high_water_mark();
+
+ // Open txns are already sorted in ascending order. This list may or may not include HWM
+ // but it is guaranteed that list won't have txn > HWM. But, if we overwrite the HWM with currentTxn
+ // then need to truncate the exceptions list accordingly.
+ List<Long> openTxns = txns.getOpen_txns();
+
+ // We care only about open/aborted txns below currentTxn and hence the size should be determined
+ // for the exceptions list. The currentTxn will be missing in openTxns list only in rare case like
+ // txn is aborted by AcidHouseKeeperService and compactor actually cleans up the aborted txns.
+ // So, for such cases, we get negative value for sizeToHwm with found position for currentTxn, and so,
+ // we just negate it to get the size.
+ int sizeToHwm = (currentTxn > 0) ? Collections.binarySearch(openTxns, currentTxn) : openTxns.size();
+ sizeToHwm = (sizeToHwm < 0) ? (-sizeToHwm) : sizeToHwm;
+ long[] exceptions = new long[sizeToHwm];
+ BitSet inAbortedBits = BitSet.valueOf(txns.getAbortedBits());
+ BitSet outAbortedBits = new BitSet();
+ long minOpenTxnId = Long.MAX_VALUE;
int i = 0;
- for (long txn : open) {
- if (currentTxn > 0 && currentTxn == txn) continue;
+ for (long txn : openTxns) {
+ // For snapshot isolation, we don't care about txns greater than current txn and so stop here.
+ // Also, we need not include current txn to exceptions list.
+ if ((currentTxn > 0) && (txn >= currentTxn)) {
+ break;
+ }
+ if (inAbortedBits.get(i)) {
+ outAbortedBits.set(i);
+ } else if (minOpenTxnId == Long.MAX_VALUE) {
+ minOpenTxnId = txn;
+ }
exceptions[i++] = txn;
}
- if (txns.isSetMin_open_txn()) {
- return new ValidReadTxnList(exceptions, abortedBits, highWater, txns.getMin_open_txn());
- } else {
- return new ValidReadTxnList(exceptions, abortedBits, highWater);
- }
+ return new ValidReadTxnList(exceptions, outAbortedBits, highWaterMark, minOpenTxnId);
}
/**