You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/07/03 10:02:42 UTC
[03/16] hive git commit: HIVE-19267: Replicate ACID/MM tables write
operations (Mahesh Kumar Behera, reviewed by Sankar Hariappan)
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 11affe3..031e72b 100644
--- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -11562,17 +11562,20 @@ class CommitTxnRequest:
Attributes:
- txnid
- replPolicy
+ - writeEventInfos
"""
thrift_spec = (
None, # 0
(1, TType.I64, 'txnid', None, None, ), # 1
(2, TType.STRING, 'replPolicy', None, None, ), # 2
+ (3, TType.LIST, 'writeEventInfos', (TType.STRUCT,(WriteEventInfo, WriteEventInfo.thrift_spec)), None, ), # 3
)
- def __init__(self, txnid=None, replPolicy=None,):
+ def __init__(self, txnid=None, replPolicy=None, writeEventInfos=None,):
self.txnid = txnid
self.replPolicy = replPolicy
+ self.writeEventInfos = writeEventInfos
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -11593,6 +11596,17 @@ class CommitTxnRequest:
self.replPolicy = iprot.readString()
else:
iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.LIST:
+ self.writeEventInfos = []
+ (_etype526, _size523) = iprot.readListBegin()
+ for _i527 in xrange(_size523):
+ _elem528 = WriteEventInfo()
+ _elem528.read(iprot)
+ self.writeEventInfos.append(_elem528)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -11611,6 +11625,13 @@ class CommitTxnRequest:
oprot.writeFieldBegin('replPolicy', TType.STRING, 2)
oprot.writeString(self.replPolicy)
oprot.writeFieldEnd()
+ if self.writeEventInfos is not None:
+ oprot.writeFieldBegin('writeEventInfos', TType.LIST, 3)
+ oprot.writeListBegin(TType.STRUCT, len(self.writeEventInfos))
+ for iter529 in self.writeEventInfos:
+ iter529.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -11624,6 +11645,158 @@ class CommitTxnRequest:
value = 17
value = (value * 31) ^ hash(self.txnid)
value = (value * 31) ^ hash(self.replPolicy)
+ value = (value * 31) ^ hash(self.writeEventInfos)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class WriteEventInfo:
+ """
+ Attributes:
+ - writeId
+ - database
+ - table
+ - files
+ - partition
+ - tableObj
+ - partitionObj
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I64, 'writeId', None, None, ), # 1
+ (2, TType.STRING, 'database', None, None, ), # 2
+ (3, TType.STRING, 'table', None, None, ), # 3
+ (4, TType.STRING, 'files', None, None, ), # 4
+ (5, TType.STRING, 'partition', None, None, ), # 5
+ (6, TType.STRING, 'tableObj', None, None, ), # 6
+ (7, TType.STRING, 'partitionObj', None, None, ), # 7
+ )
+
+ def __init__(self, writeId=None, database=None, table=None, files=None, partition=None, tableObj=None, partitionObj=None,):
+ self.writeId = writeId
+ self.database = database
+ self.table = table
+ self.files = files
+ self.partition = partition
+ self.tableObj = tableObj
+ self.partitionObj = partitionObj
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I64:
+ self.writeId = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.database = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRING:
+ self.table = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.STRING:
+ self.files = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 5:
+ if ftype == TType.STRING:
+ self.partition = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.STRING:
+ self.tableObj = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 7:
+ if ftype == TType.STRING:
+ self.partitionObj = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('WriteEventInfo')
+ if self.writeId is not None:
+ oprot.writeFieldBegin('writeId', TType.I64, 1)
+ oprot.writeI64(self.writeId)
+ oprot.writeFieldEnd()
+ if self.database is not None:
+ oprot.writeFieldBegin('database', TType.STRING, 2)
+ oprot.writeString(self.database)
+ oprot.writeFieldEnd()
+ if self.table is not None:
+ oprot.writeFieldBegin('table', TType.STRING, 3)
+ oprot.writeString(self.table)
+ oprot.writeFieldEnd()
+ if self.files is not None:
+ oprot.writeFieldBegin('files', TType.STRING, 4)
+ oprot.writeString(self.files)
+ oprot.writeFieldEnd()
+ if self.partition is not None:
+ oprot.writeFieldBegin('partition', TType.STRING, 5)
+ oprot.writeString(self.partition)
+ oprot.writeFieldEnd()
+ if self.tableObj is not None:
+ oprot.writeFieldBegin('tableObj', TType.STRING, 6)
+ oprot.writeString(self.tableObj)
+ oprot.writeFieldEnd()
+ if self.partitionObj is not None:
+ oprot.writeFieldBegin('partitionObj', TType.STRING, 7)
+ oprot.writeString(self.partitionObj)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.writeId is None:
+ raise TProtocol.TProtocolException(message='Required field writeId is unset!')
+ if self.database is None:
+ raise TProtocol.TProtocolException(message='Required field database is unset!')
+ if self.table is None:
+ raise TProtocol.TProtocolException(message='Required field table is unset!')
+ if self.files is None:
+ raise TProtocol.TProtocolException(message='Required field files is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.writeId)
+ value = (value * 31) ^ hash(self.database)
+ value = (value * 31) ^ hash(self.table)
+ value = (value * 31) ^ hash(self.files)
+ value = (value * 31) ^ hash(self.partition)
+ value = (value * 31) ^ hash(self.tableObj)
+ value = (value * 31) ^ hash(self.partitionObj)
return value
def __repr__(self):
@@ -11703,10 +11876,10 @@ class ReplTblWriteIdStateRequest:
elif fid == 6:
if ftype == TType.LIST:
self.partNames = []
- (_etype526, _size523) = iprot.readListBegin()
- for _i527 in xrange(_size523):
- _elem528 = iprot.readString()
- self.partNames.append(_elem528)
+ (_etype533, _size530) = iprot.readListBegin()
+ for _i534 in xrange(_size530):
+ _elem535 = iprot.readString()
+ self.partNames.append(_elem535)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -11743,8 +11916,8 @@ class ReplTblWriteIdStateRequest:
if self.partNames is not None:
oprot.writeFieldBegin('partNames', TType.LIST, 6)
oprot.writeListBegin(TType.STRING, len(self.partNames))
- for iter529 in self.partNames:
- oprot.writeString(iter529)
+ for iter536 in self.partNames:
+ oprot.writeString(iter536)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -11814,10 +11987,10 @@ class GetValidWriteIdsRequest:
if fid == 1:
if ftype == TType.LIST:
self.fullTableNames = []
- (_etype533, _size530) = iprot.readListBegin()
- for _i534 in xrange(_size530):
- _elem535 = iprot.readString()
- self.fullTableNames.append(_elem535)
+ (_etype540, _size537) = iprot.readListBegin()
+ for _i541 in xrange(_size537):
+ _elem542 = iprot.readString()
+ self.fullTableNames.append(_elem542)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -11839,8 +12012,8 @@ class GetValidWriteIdsRequest:
if self.fullTableNames is not None:
oprot.writeFieldBegin('fullTableNames', TType.LIST, 1)
oprot.writeListBegin(TType.STRING, len(self.fullTableNames))
- for iter536 in self.fullTableNames:
- oprot.writeString(iter536)
+ for iter543 in self.fullTableNames:
+ oprot.writeString(iter543)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.validTxnList is not None:
@@ -11923,10 +12096,10 @@ class TableValidWriteIds:
elif fid == 3:
if ftype == TType.LIST:
self.invalidWriteIds = []
- (_etype540, _size537) = iprot.readListBegin()
- for _i541 in xrange(_size537):
- _elem542 = iprot.readI64()
- self.invalidWriteIds.append(_elem542)
+ (_etype547, _size544) = iprot.readListBegin()
+ for _i548 in xrange(_size544):
+ _elem549 = iprot.readI64()
+ self.invalidWriteIds.append(_elem549)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -11961,8 +12134,8 @@ class TableValidWriteIds:
if self.invalidWriteIds is not None:
oprot.writeFieldBegin('invalidWriteIds', TType.LIST, 3)
oprot.writeListBegin(TType.I64, len(self.invalidWriteIds))
- for iter543 in self.invalidWriteIds:
- oprot.writeI64(iter543)
+ for iter550 in self.invalidWriteIds:
+ oprot.writeI64(iter550)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.minOpenWriteId is not None:
@@ -12034,11 +12207,11 @@ class GetValidWriteIdsResponse:
if fid == 1:
if ftype == TType.LIST:
self.tblValidWriteIds = []
- (_etype547, _size544) = iprot.readListBegin()
- for _i548 in xrange(_size544):
- _elem549 = TableValidWriteIds()
- _elem549.read(iprot)
- self.tblValidWriteIds.append(_elem549)
+ (_etype554, _size551) = iprot.readListBegin()
+ for _i555 in xrange(_size551):
+ _elem556 = TableValidWriteIds()
+ _elem556.read(iprot)
+ self.tblValidWriteIds.append(_elem556)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -12055,8 +12228,8 @@ class GetValidWriteIdsResponse:
if self.tblValidWriteIds is not None:
oprot.writeFieldBegin('tblValidWriteIds', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.tblValidWriteIds))
- for iter550 in self.tblValidWriteIds:
- iter550.write(oprot)
+ for iter557 in self.tblValidWriteIds:
+ iter557.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -12132,10 +12305,10 @@ class AllocateTableWriteIdsRequest:
elif fid == 3:
if ftype == TType.LIST:
self.txnIds = []
- (_etype554, _size551) = iprot.readListBegin()
- for _i555 in xrange(_size551):
- _elem556 = iprot.readI64()
- self.txnIds.append(_elem556)
+ (_etype561, _size558) = iprot.readListBegin()
+ for _i562 in xrange(_size558):
+ _elem563 = iprot.readI64()
+ self.txnIds.append(_elem563)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -12147,11 +12320,11 @@ class AllocateTableWriteIdsRequest:
elif fid == 5:
if ftype == TType.LIST:
self.srcTxnToWriteIdList = []
- (_etype560, _size557) = iprot.readListBegin()
- for _i561 in xrange(_size557):
- _elem562 = TxnToWriteId()
- _elem562.read(iprot)
- self.srcTxnToWriteIdList.append(_elem562)
+ (_etype567, _size564) = iprot.readListBegin()
+ for _i568 in xrange(_size564):
+ _elem569 = TxnToWriteId()
+ _elem569.read(iprot)
+ self.srcTxnToWriteIdList.append(_elem569)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -12176,8 +12349,8 @@ class AllocateTableWriteIdsRequest:
if self.txnIds is not None:
oprot.writeFieldBegin('txnIds', TType.LIST, 3)
oprot.writeListBegin(TType.I64, len(self.txnIds))
- for iter563 in self.txnIds:
- oprot.writeI64(iter563)
+ for iter570 in self.txnIds:
+ oprot.writeI64(iter570)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.replPolicy is not None:
@@ -12187,8 +12360,8 @@ class AllocateTableWriteIdsRequest:
if self.srcTxnToWriteIdList is not None:
oprot.writeFieldBegin('srcTxnToWriteIdList', TType.LIST, 5)
oprot.writeListBegin(TType.STRUCT, len(self.srcTxnToWriteIdList))
- for iter564 in self.srcTxnToWriteIdList:
- iter564.write(oprot)
+ for iter571 in self.srcTxnToWriteIdList:
+ iter571.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -12330,11 +12503,11 @@ class AllocateTableWriteIdsResponse:
if fid == 1:
if ftype == TType.LIST:
self.txnToWriteIds = []
- (_etype568, _size565) = iprot.readListBegin()
- for _i569 in xrange(_size565):
- _elem570 = TxnToWriteId()
- _elem570.read(iprot)
- self.txnToWriteIds.append(_elem570)
+ (_etype575, _size572) = iprot.readListBegin()
+ for _i576 in xrange(_size572):
+ _elem577 = TxnToWriteId()
+ _elem577.read(iprot)
+ self.txnToWriteIds.append(_elem577)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -12351,8 +12524,8 @@ class AllocateTableWriteIdsResponse:
if self.txnToWriteIds is not None:
oprot.writeFieldBegin('txnToWriteIds', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.txnToWriteIds))
- for iter571 in self.txnToWriteIds:
- iter571.write(oprot)
+ for iter578 in self.txnToWriteIds:
+ iter578.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -12580,11 +12753,11 @@ class LockRequest:
if fid == 1:
if ftype == TType.LIST:
self.component = []
- (_etype575, _size572) = iprot.readListBegin()
- for _i576 in xrange(_size572):
- _elem577 = LockComponent()
- _elem577.read(iprot)
- self.component.append(_elem577)
+ (_etype582, _size579) = iprot.readListBegin()
+ for _i583 in xrange(_size579):
+ _elem584 = LockComponent()
+ _elem584.read(iprot)
+ self.component.append(_elem584)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -12621,8 +12794,8 @@ class LockRequest:
if self.component is not None:
oprot.writeFieldBegin('component', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.component))
- for iter578 in self.component:
- iter578.write(oprot)
+ for iter585 in self.component:
+ iter585.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.txnid is not None:
@@ -13320,11 +13493,11 @@ class ShowLocksResponse:
if fid == 1:
if ftype == TType.LIST:
self.locks = []
- (_etype582, _size579) = iprot.readListBegin()
- for _i583 in xrange(_size579):
- _elem584 = ShowLocksResponseElement()
- _elem584.read(iprot)
- self.locks.append(_elem584)
+ (_etype589, _size586) = iprot.readListBegin()
+ for _i590 in xrange(_size586):
+ _elem591 = ShowLocksResponseElement()
+ _elem591.read(iprot)
+ self.locks.append(_elem591)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -13341,8 +13514,8 @@ class ShowLocksResponse:
if self.locks is not None:
oprot.writeFieldBegin('locks', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.locks))
- for iter585 in self.locks:
- iter585.write(oprot)
+ for iter592 in self.locks:
+ iter592.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -13557,20 +13730,20 @@ class HeartbeatTxnRangeResponse:
if fid == 1:
if ftype == TType.SET:
self.aborted = set()
- (_etype589, _size586) = iprot.readSetBegin()
- for _i590 in xrange(_size586):
- _elem591 = iprot.readI64()
- self.aborted.add(_elem591)
+ (_etype596, _size593) = iprot.readSetBegin()
+ for _i597 in xrange(_size593):
+ _elem598 = iprot.readI64()
+ self.aborted.add(_elem598)
iprot.readSetEnd()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.SET:
self.nosuch = set()
- (_etype595, _size592) = iprot.readSetBegin()
- for _i596 in xrange(_size592):
- _elem597 = iprot.readI64()
- self.nosuch.add(_elem597)
+ (_etype602, _size599) = iprot.readSetBegin()
+ for _i603 in xrange(_size599):
+ _elem604 = iprot.readI64()
+ self.nosuch.add(_elem604)
iprot.readSetEnd()
else:
iprot.skip(ftype)
@@ -13587,15 +13760,15 @@ class HeartbeatTxnRangeResponse:
if self.aborted is not None:
oprot.writeFieldBegin('aborted', TType.SET, 1)
oprot.writeSetBegin(TType.I64, len(self.aborted))
- for iter598 in self.aborted:
- oprot.writeI64(iter598)
+ for iter605 in self.aborted:
+ oprot.writeI64(iter605)
oprot.writeSetEnd()
oprot.writeFieldEnd()
if self.nosuch is not None:
oprot.writeFieldBegin('nosuch', TType.SET, 2)
oprot.writeSetBegin(TType.I64, len(self.nosuch))
- for iter599 in self.nosuch:
- oprot.writeI64(iter599)
+ for iter606 in self.nosuch:
+ oprot.writeI64(iter606)
oprot.writeSetEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -13692,11 +13865,11 @@ class CompactionRequest:
elif fid == 6:
if ftype == TType.MAP:
self.properties = {}
- (_ktype601, _vtype602, _size600 ) = iprot.readMapBegin()
- for _i604 in xrange(_size600):
- _key605 = iprot.readString()
- _val606 = iprot.readString()
- self.properties[_key605] = _val606
+ (_ktype608, _vtype609, _size607 ) = iprot.readMapBegin()
+ for _i611 in xrange(_size607):
+ _key612 = iprot.readString()
+ _val613 = iprot.readString()
+ self.properties[_key612] = _val613
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -13733,9 +13906,9 @@ class CompactionRequest:
if self.properties is not None:
oprot.writeFieldBegin('properties', TType.MAP, 6)
oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.properties))
- for kiter607,viter608 in self.properties.items():
- oprot.writeString(kiter607)
- oprot.writeString(viter608)
+ for kiter614,viter615 in self.properties.items():
+ oprot.writeString(kiter614)
+ oprot.writeString(viter615)
oprot.writeMapEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -14170,11 +14343,11 @@ class ShowCompactResponse:
if fid == 1:
if ftype == TType.LIST:
self.compacts = []
- (_etype612, _size609) = iprot.readListBegin()
- for _i613 in xrange(_size609):
- _elem614 = ShowCompactResponseElement()
- _elem614.read(iprot)
- self.compacts.append(_elem614)
+ (_etype619, _size616) = iprot.readListBegin()
+ for _i620 in xrange(_size616):
+ _elem621 = ShowCompactResponseElement()
+ _elem621.read(iprot)
+ self.compacts.append(_elem621)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -14191,8 +14364,8 @@ class ShowCompactResponse:
if self.compacts is not None:
oprot.writeFieldBegin('compacts', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.compacts))
- for iter615 in self.compacts:
- iter615.write(oprot)
+ for iter622 in self.compacts:
+ iter622.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -14281,10 +14454,10 @@ class AddDynamicPartitions:
elif fid == 5:
if ftype == TType.LIST:
self.partitionnames = []
- (_etype619, _size616) = iprot.readListBegin()
- for _i620 in xrange(_size616):
- _elem621 = iprot.readString()
- self.partitionnames.append(_elem621)
+ (_etype626, _size623) = iprot.readListBegin()
+ for _i627 in xrange(_size623):
+ _elem628 = iprot.readString()
+ self.partitionnames.append(_elem628)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -14322,8 +14495,8 @@ class AddDynamicPartitions:
if self.partitionnames is not None:
oprot.writeFieldBegin('partitionnames', TType.LIST, 5)
oprot.writeListBegin(TType.STRING, len(self.partitionnames))
- for iter622 in self.partitionnames:
- oprot.writeString(iter622)
+ for iter629 in self.partitionnames:
+ oprot.writeString(iter629)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.operationType is not None:
@@ -14553,10 +14726,10 @@ class CreationMetadata:
elif fid == 4:
if ftype == TType.SET:
self.tablesUsed = set()
- (_etype626, _size623) = iprot.readSetBegin()
- for _i627 in xrange(_size623):
- _elem628 = iprot.readString()
- self.tablesUsed.add(_elem628)
+ (_etype633, _size630) = iprot.readSetBegin()
+ for _i634 in xrange(_size630):
+ _elem635 = iprot.readString()
+ self.tablesUsed.add(_elem635)
iprot.readSetEnd()
else:
iprot.skip(ftype)
@@ -14590,8 +14763,8 @@ class CreationMetadata:
if self.tablesUsed is not None:
oprot.writeFieldBegin('tablesUsed', TType.SET, 4)
oprot.writeSetBegin(TType.STRING, len(self.tablesUsed))
- for iter629 in self.tablesUsed:
- oprot.writeString(iter629)
+ for iter636 in self.tablesUsed:
+ oprot.writeString(iter636)
oprot.writeSetEnd()
oprot.writeFieldEnd()
if self.validTxnList is not None:
@@ -14903,11 +15076,11 @@ class NotificationEventResponse:
if fid == 1:
if ftype == TType.LIST:
self.events = []
- (_etype633, _size630) = iprot.readListBegin()
- for _i634 in xrange(_size630):
- _elem635 = NotificationEvent()
- _elem635.read(iprot)
- self.events.append(_elem635)
+ (_etype640, _size637) = iprot.readListBegin()
+ for _i641 in xrange(_size637):
+ _elem642 = NotificationEvent()
+ _elem642.read(iprot)
+ self.events.append(_elem642)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -14924,8 +15097,8 @@ class NotificationEventResponse:
if self.events is not None:
oprot.writeFieldBegin('events', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.events))
- for iter636 in self.events:
- iter636.write(oprot)
+ for iter643 in self.events:
+ iter643.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -15188,6 +15361,7 @@ class InsertEventRequestData:
- replace
- filesAdded
- filesAddedChecksum
+ - subDirectoryList
"""
thrift_spec = (
@@ -15195,12 +15369,14 @@ class InsertEventRequestData:
(1, TType.BOOL, 'replace', None, None, ), # 1
(2, TType.LIST, 'filesAdded', (TType.STRING,None), None, ), # 2
(3, TType.LIST, 'filesAddedChecksum', (TType.STRING,None), None, ), # 3
+ (4, TType.LIST, 'subDirectoryList', (TType.STRING,None), None, ), # 4
)
- def __init__(self, replace=None, filesAdded=None, filesAddedChecksum=None,):
+ def __init__(self, replace=None, filesAdded=None, filesAddedChecksum=None, subDirectoryList=None,):
self.replace = replace
self.filesAdded = filesAdded
self.filesAddedChecksum = filesAddedChecksum
+ self.subDirectoryList = subDirectoryList
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -15219,20 +15395,30 @@ class InsertEventRequestData:
elif fid == 2:
if ftype == TType.LIST:
self.filesAdded = []
- (_etype640, _size637) = iprot.readListBegin()
- for _i641 in xrange(_size637):
- _elem642 = iprot.readString()
- self.filesAdded.append(_elem642)
+ (_etype647, _size644) = iprot.readListBegin()
+ for _i648 in xrange(_size644):
+ _elem649 = iprot.readString()
+ self.filesAdded.append(_elem649)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.LIST:
self.filesAddedChecksum = []
- (_etype646, _size643) = iprot.readListBegin()
- for _i647 in xrange(_size643):
- _elem648 = iprot.readString()
- self.filesAddedChecksum.append(_elem648)
+ (_etype653, _size650) = iprot.readListBegin()
+ for _i654 in xrange(_size650):
+ _elem655 = iprot.readString()
+ self.filesAddedChecksum.append(_elem655)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.LIST:
+ self.subDirectoryList = []
+ (_etype659, _size656) = iprot.readListBegin()
+ for _i660 in xrange(_size656):
+ _elem661 = iprot.readString()
+ self.subDirectoryList.append(_elem661)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -15253,15 +15439,22 @@ class InsertEventRequestData:
if self.filesAdded is not None:
oprot.writeFieldBegin('filesAdded', TType.LIST, 2)
oprot.writeListBegin(TType.STRING, len(self.filesAdded))
- for iter649 in self.filesAdded:
- oprot.writeString(iter649)
+ for iter662 in self.filesAdded:
+ oprot.writeString(iter662)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.filesAddedChecksum is not None:
oprot.writeFieldBegin('filesAddedChecksum', TType.LIST, 3)
oprot.writeListBegin(TType.STRING, len(self.filesAddedChecksum))
- for iter650 in self.filesAddedChecksum:
- oprot.writeString(iter650)
+ for iter663 in self.filesAddedChecksum:
+ oprot.writeString(iter663)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.subDirectoryList is not None:
+ oprot.writeFieldBegin('subDirectoryList', TType.LIST, 4)
+ oprot.writeListBegin(TType.STRING, len(self.subDirectoryList))
+ for iter664 in self.subDirectoryList:
+ oprot.writeString(iter664)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -15278,6 +15471,7 @@ class InsertEventRequestData:
value = (value * 31) ^ hash(self.replace)
value = (value * 31) ^ hash(self.filesAdded)
value = (value * 31) ^ hash(self.filesAddedChecksum)
+ value = (value * 31) ^ hash(self.subDirectoryList)
return value
def __repr__(self):
@@ -15419,10 +15613,10 @@ class FireEventRequest:
elif fid == 5:
if ftype == TType.LIST:
self.partitionVals = []
- (_etype654, _size651) = iprot.readListBegin()
- for _i655 in xrange(_size651):
- _elem656 = iprot.readString()
- self.partitionVals.append(_elem656)
+ (_etype668, _size665) = iprot.readListBegin()
+ for _i669 in xrange(_size665):
+ _elem670 = iprot.readString()
+ self.partitionVals.append(_elem670)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -15460,8 +15654,8 @@ class FireEventRequest:
if self.partitionVals is not None:
oprot.writeFieldBegin('partitionVals', TType.LIST, 5)
oprot.writeListBegin(TType.STRING, len(self.partitionVals))
- for iter657 in self.partitionVals:
- oprot.writeString(iter657)
+ for iter671 in self.partitionVals:
+ oprot.writeString(iter671)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.catName is not None:
@@ -15546,6 +15740,201 @@ class FireEventResponse:
def __ne__(self, other):
return not (self == other)
+class WriteNotificationLogRequest:
+ """
+ Attributes:
+ - txnId
+ - writeId
+ - db
+ - table
+ - fileInfo
+ - partitionVals
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I64, 'txnId', None, None, ), # 1
+ (2, TType.I64, 'writeId', None, None, ), # 2
+ (3, TType.STRING, 'db', None, None, ), # 3
+ (4, TType.STRING, 'table', None, None, ), # 4
+ (5, TType.STRUCT, 'fileInfo', (InsertEventRequestData, InsertEventRequestData.thrift_spec), None, ), # 5
+ (6, TType.LIST, 'partitionVals', (TType.STRING,None), None, ), # 6
+ )
+
+ def __init__(self, txnId=None, writeId=None, db=None, table=None, fileInfo=None, partitionVals=None,):
+ self.txnId = txnId
+ self.writeId = writeId
+ self.db = db
+ self.table = table
+ self.fileInfo = fileInfo
+ self.partitionVals = partitionVals
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I64:
+ self.txnId = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.I64:
+ self.writeId = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRING:
+ self.db = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.STRING:
+ self.table = iprot.readString()
+ else:
+ iprot.skip(ftype)
+ elif fid == 5:
+ if ftype == TType.STRUCT:
+ self.fileInfo = InsertEventRequestData()
+ self.fileInfo.read(iprot)
+ else:
+ iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.LIST:
+ self.partitionVals = []
+ (_etype675, _size672) = iprot.readListBegin()
+ for _i676 in xrange(_size672):
+ _elem677 = iprot.readString()
+ self.partitionVals.append(_elem677)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('WriteNotificationLogRequest')
+ if self.txnId is not None:
+ oprot.writeFieldBegin('txnId', TType.I64, 1)
+ oprot.writeI64(self.txnId)
+ oprot.writeFieldEnd()
+ if self.writeId is not None:
+ oprot.writeFieldBegin('writeId', TType.I64, 2)
+ oprot.writeI64(self.writeId)
+ oprot.writeFieldEnd()
+ if self.db is not None:
+ oprot.writeFieldBegin('db', TType.STRING, 3)
+ oprot.writeString(self.db)
+ oprot.writeFieldEnd()
+ if self.table is not None:
+ oprot.writeFieldBegin('table', TType.STRING, 4)
+ oprot.writeString(self.table)
+ oprot.writeFieldEnd()
+ if self.fileInfo is not None:
+ oprot.writeFieldBegin('fileInfo', TType.STRUCT, 5)
+ self.fileInfo.write(oprot)
+ oprot.writeFieldEnd()
+ if self.partitionVals is not None:
+ oprot.writeFieldBegin('partitionVals', TType.LIST, 6)
+ oprot.writeListBegin(TType.STRING, len(self.partitionVals))
+ for iter678 in self.partitionVals:
+ oprot.writeString(iter678)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.txnId is None:
+ raise TProtocol.TProtocolException(message='Required field txnId is unset!')
+ if self.writeId is None:
+ raise TProtocol.TProtocolException(message='Required field writeId is unset!')
+ if self.db is None:
+ raise TProtocol.TProtocolException(message='Required field db is unset!')
+ if self.table is None:
+ raise TProtocol.TProtocolException(message='Required field table is unset!')
+ if self.fileInfo is None:
+ raise TProtocol.TProtocolException(message='Required field fileInfo is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.txnId)
+ value = (value * 31) ^ hash(self.writeId)
+ value = (value * 31) ^ hash(self.db)
+ value = (value * 31) ^ hash(self.table)
+ value = (value * 31) ^ hash(self.fileInfo)
+ value = (value * 31) ^ hash(self.partitionVals)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class WriteNotificationLogResponse:
+
+ thrift_spec = (
+ )
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('WriteNotificationLogResponse')
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __hash__(self):
+ value = 17
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class MetadataPpdResult:
"""
Attributes:
@@ -15653,12 +16042,12 @@ class GetFileMetadataByExprResult:
if fid == 1:
if ftype == TType.MAP:
self.metadata = {}
- (_ktype659, _vtype660, _size658 ) = iprot.readMapBegin()
- for _i662 in xrange(_size658):
- _key663 = iprot.readI64()
- _val664 = MetadataPpdResult()
- _val664.read(iprot)
- self.metadata[_key663] = _val664
+ (_ktype680, _vtype681, _size679 ) = iprot.readMapBegin()
+ for _i683 in xrange(_size679):
+ _key684 = iprot.readI64()
+ _val685 = MetadataPpdResult()
+ _val685.read(iprot)
+ self.metadata[_key684] = _val685
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -15680,9 +16069,9 @@ class GetFileMetadataByExprResult:
if self.metadata is not None:
oprot.writeFieldBegin('metadata', TType.MAP, 1)
oprot.writeMapBegin(TType.I64, TType.STRUCT, len(self.metadata))
- for kiter665,viter666 in self.metadata.items():
- oprot.writeI64(kiter665)
- viter666.write(oprot)
+ for kiter686,viter687 in self.metadata.items():
+ oprot.writeI64(kiter686)
+ viter687.write(oprot)
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.isSupported is not None:
@@ -15752,10 +16141,10 @@ class GetFileMetadataByExprRequest:
if fid == 1:
if ftype == TType.LIST:
self.fileIds = []
- (_etype670, _size667) = iprot.readListBegin()
- for _i671 in xrange(_size667):
- _elem672 = iprot.readI64()
- self.fileIds.append(_elem672)
+ (_etype691, _size688) = iprot.readListBegin()
+ for _i692 in xrange(_size688):
+ _elem693 = iprot.readI64()
+ self.fileIds.append(_elem693)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -15787,8 +16176,8 @@ class GetFileMetadataByExprRequest:
if self.fileIds is not None:
oprot.writeFieldBegin('fileIds', TType.LIST, 1)
oprot.writeListBegin(TType.I64, len(self.fileIds))
- for iter673 in self.fileIds:
- oprot.writeI64(iter673)
+ for iter694 in self.fileIds:
+ oprot.writeI64(iter694)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.expr is not None:
@@ -15862,11 +16251,11 @@ class GetFileMetadataResult:
if fid == 1:
if ftype == TType.MAP:
self.metadata = {}
- (_ktype675, _vtype676, _size674 ) = iprot.readMapBegin()
- for _i678 in xrange(_size674):
- _key679 = iprot.readI64()
- _val680 = iprot.readString()
- self.metadata[_key679] = _val680
+ (_ktype696, _vtype697, _size695 ) = iprot.readMapBegin()
+ for _i699 in xrange(_size695):
+ _key700 = iprot.readI64()
+ _val701 = iprot.readString()
+ self.metadata[_key700] = _val701
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -15888,9 +16277,9 @@ class GetFileMetadataResult:
if self.metadata is not None:
oprot.writeFieldBegin('metadata', TType.MAP, 1)
oprot.writeMapBegin(TType.I64, TType.STRING, len(self.metadata))
- for kiter681,viter682 in self.metadata.items():
- oprot.writeI64(kiter681)
- oprot.writeString(viter682)
+ for kiter702,viter703 in self.metadata.items():
+ oprot.writeI64(kiter702)
+ oprot.writeString(viter703)
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.isSupported is not None:
@@ -15951,10 +16340,10 @@ class GetFileMetadataRequest:
if fid == 1:
if ftype == TType.LIST:
self.fileIds = []
- (_etype686, _size683) = iprot.readListBegin()
- for _i687 in xrange(_size683):
- _elem688 = iprot.readI64()
- self.fileIds.append(_elem688)
+ (_etype707, _size704) = iprot.readListBegin()
+ for _i708 in xrange(_size704):
+ _elem709 = iprot.readI64()
+ self.fileIds.append(_elem709)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -15971,8 +16360,8 @@ class GetFileMetadataRequest:
if self.fileIds is not None:
oprot.writeFieldBegin('fileIds', TType.LIST, 1)
oprot.writeListBegin(TType.I64, len(self.fileIds))
- for iter689 in self.fileIds:
- oprot.writeI64(iter689)
+ for iter710 in self.fileIds:
+ oprot.writeI64(iter710)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -16078,20 +16467,20 @@ class PutFileMetadataRequest:
if fid == 1:
if ftype == TType.LIST:
self.fileIds = []
- (_etype693, _size690) = iprot.readListBegin()
- for _i694 in xrange(_size690):
- _elem695 = iprot.readI64()
- self.fileIds.append(_elem695)
+ (_etype714, _size711) = iprot.readListBegin()
+ for _i715 in xrange(_size711):
+ _elem716 = iprot.readI64()
+ self.fileIds.append(_elem716)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.LIST:
self.metadata = []
- (_etype699, _size696) = iprot.readListBegin()
- for _i700 in xrange(_size696):
- _elem701 = iprot.readString()
- self.metadata.append(_elem701)
+ (_etype720, _size717) = iprot.readListBegin()
+ for _i721 in xrange(_size717):
+ _elem722 = iprot.readString()
+ self.metadata.append(_elem722)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -16113,15 +16502,15 @@ class PutFileMetadataRequest:
if self.fileIds is not None:
oprot.writeFieldBegin('fileIds', TType.LIST, 1)
oprot.writeListBegin(TType.I64, len(self.fileIds))
- for iter702 in self.fileIds:
- oprot.writeI64(iter702)
+ for iter723 in self.fileIds:
+ oprot.writeI64(iter723)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.metadata is not None:
oprot.writeFieldBegin('metadata', TType.LIST, 2)
oprot.writeListBegin(TType.STRING, len(self.metadata))
- for iter703 in self.metadata:
- oprot.writeString(iter703)
+ for iter724 in self.metadata:
+ oprot.writeString(iter724)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.type is not None:
@@ -16229,10 +16618,10 @@ class ClearFileMetadataRequest:
if fid == 1:
if ftype == TType.LIST:
self.fileIds = []
- (_etype707, _size704) = iprot.readListBegin()
- for _i708 in xrange(_size704):
- _elem709 = iprot.readI64()
- self.fileIds.append(_elem709)
+ (_etype728, _size725) = iprot.readListBegin()
+ for _i729 in xrange(_size725):
+ _elem730 = iprot.readI64()
+ self.fileIds.append(_elem730)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -16249,8 +16638,8 @@ class ClearFileMetadataRequest:
if self.fileIds is not None:
oprot.writeFieldBegin('fileIds', TType.LIST, 1)
oprot.writeListBegin(TType.I64, len(self.fileIds))
- for iter710 in self.fileIds:
- oprot.writeI64(iter710)
+ for iter731 in self.fileIds:
+ oprot.writeI64(iter731)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -16479,11 +16868,11 @@ class GetAllFunctionsResponse:
if fid == 1:
if ftype == TType.LIST:
self.functions = []
- (_etype714, _size711) = iprot.readListBegin()
- for _i715 in xrange(_size711):
- _elem716 = Function()
- _elem716.read(iprot)
- self.functions.append(_elem716)
+ (_etype735, _size732) = iprot.readListBegin()
+ for _i736 in xrange(_size732):
+ _elem737 = Function()
+ _elem737.read(iprot)
+ self.functions.append(_elem737)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -16500,8 +16889,8 @@ class GetAllFunctionsResponse:
if self.functions is not None:
oprot.writeFieldBegin('functions', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.functions))
- for iter717 in self.functions:
- iter717.write(oprot)
+ for iter738 in self.functions:
+ iter738.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -16553,10 +16942,10 @@ class ClientCapabilities:
if fid == 1:
if ftype == TType.LIST:
self.values = []
- (_etype721, _size718) = iprot.readListBegin()
- for _i722 in xrange(_size718):
- _elem723 = iprot.readI32()
- self.values.append(_elem723)
+ (_etype742, _size739) = iprot.readListBegin()
+ for _i743 in xrange(_size739):
+ _elem744 = iprot.readI32()
+ self.values.append(_elem744)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -16573,8 +16962,8 @@ class ClientCapabilities:
if self.values is not None:
oprot.writeFieldBegin('values', TType.LIST, 1)
oprot.writeListBegin(TType.I32, len(self.values))
- for iter724 in self.values:
- oprot.writeI32(iter724)
+ for iter745 in self.values:
+ oprot.writeI32(iter745)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -16819,10 +17208,10 @@ class GetTablesRequest:
elif fid == 2:
if ftype == TType.LIST:
self.tblNames = []
- (_etype728, _size725) = iprot.readListBegin()
- for _i729 in xrange(_size725):
- _elem730 = iprot.readString()
- self.tblNames.append(_elem730)
+ (_etype749, _size746) = iprot.readListBegin()
+ for _i750 in xrange(_size746):
+ _elem751 = iprot.readString()
+ self.tblNames.append(_elem751)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -16854,8 +17243,8 @@ class GetTablesRequest:
if self.tblNames is not None:
oprot.writeFieldBegin('tblNames', TType.LIST, 2)
oprot.writeListBegin(TType.STRING, len(self.tblNames))
- for iter731 in self.tblNames:
- oprot.writeString(iter731)
+ for iter752 in self.tblNames:
+ oprot.writeString(iter752)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.capabilities is not None:
@@ -16920,11 +17309,11 @@ class GetTablesResult:
if fid == 1:
if ftype == TType.LIST:
self.tables = []
- (_etype735, _size732) = iprot.readListBegin()
- for _i736 in xrange(_size732):
- _elem737 = Table()
- _elem737.read(iprot)
- self.tables.append(_elem737)
+ (_etype756, _size753) = iprot.readListBegin()
+ for _i757 in xrange(_size753):
+ _elem758 = Table()
+ _elem758.read(iprot)
+ self.tables.append(_elem758)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -16941,8 +17330,8 @@ class GetTablesResult:
if self.tables is not None:
oprot.writeFieldBegin('tables', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.tables))
- for iter738 in self.tables:
- iter738.write(oprot)
+ for iter759 in self.tables:
+ iter759.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -17256,10 +17645,10 @@ class Materialization:
if fid == 1:
if ftype == TType.SET:
self.tablesUsed = set()
- (_etype742, _size739) = iprot.readSetBegin()
- for _i743 in xrange(_size739):
- _elem744 = iprot.readString()
- self.tablesUsed.add(_elem744)
+ (_etype763, _size760) = iprot.readSetBegin()
+ for _i764 in xrange(_size760):
+ _elem765 = iprot.readString()
+ self.tablesUsed.add(_elem765)
iprot.readSetEnd()
else:
iprot.skip(ftype)
@@ -17291,8 +17680,8 @@ class Materialization:
if self.tablesUsed is not None:
oprot.writeFieldBegin('tablesUsed', TType.SET, 1)
oprot.writeSetBegin(TType.STRING, len(self.tablesUsed))
- for iter745 in self.tablesUsed:
- oprot.writeString(iter745)
+ for iter766 in self.tablesUsed:
+ oprot.writeString(iter766)
oprot.writeSetEnd()
oprot.writeFieldEnd()
if self.validTxnList is not None:
@@ -18197,44 +18586,44 @@ class WMFullResourcePlan:
elif fid == 2:
if ftype == TType.LIST:
self.pools = []
- (_etype749, _size746) = iprot.readListBegin()
- for _i750 in xrange(_size746):
- _elem751 = WMPool()
- _elem751.read(iprot)
- self.pools.append(_elem751)
+ (_etype770, _size767) = iprot.readListBegin()
+ for _i771 in xrange(_size767):
+ _elem772 = WMPool()
+ _elem772.read(iprot)
+ self.pools.append(_elem772)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.LIST:
self.mappings = []
- (_etype755, _size752) = iprot.readListBegin()
- for _i756 in xrange(_size752):
- _elem757 = WMMapping()
- _elem757.read(iprot)
- self.mappings.append(_elem757)
+ (_etype776, _size773) = iprot.readListBegin()
+ for _i777 in xrange(_size773):
+ _elem778 = WMMapping()
+ _elem778.read(iprot)
+ self.mappings.append(_elem778)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.LIST:
self.triggers = []
- (_etype761, _size758) = iprot.readListBegin()
- for _i762 in xrange(_size758):
- _elem763 = WMTrigger()
- _elem763.read(iprot)
- self.triggers.append(_elem763)
+ (_etype782, _size779) = iprot.readListBegin()
+ for _i783 in xrange(_size779):
+ _elem784 = WMTrigger()
+ _elem784.read(iprot)
+ self.triggers.append(_elem784)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 5:
if ftype == TType.LIST:
self.poolTriggers = []
- (_etype767, _size764) = iprot.readListBegin()
- for _i768 in xrange(_size764):
- _elem769 = WMPoolTrigger()
- _elem769.read(iprot)
- self.poolTriggers.append(_elem769)
+ (_etype788, _size785) = iprot.readListBegin()
+ for _i789 in xrange(_size785):
+ _elem790 = WMPoolTrigger()
+ _elem790.read(iprot)
+ self.poolTriggers.append(_elem790)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -18255,29 +18644,29 @@ class WMFullResourcePlan:
if self.pools is not None:
oprot.writeFieldBegin('pools', TType.LIST, 2)
oprot.writeListBegin(TType.STRUCT, len(self.pools))
- for iter770 in self.pools:
- iter770.write(oprot)
+ for iter791 in self.pools:
+ iter791.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.mappings is not None:
oprot.writeFieldBegin('mappings', TType.LIST, 3)
oprot.writeListBegin(TType.STRUCT, len(self.mappings))
- for iter771 in self.mappings:
- iter771.write(oprot)
+ for iter792 in self.mappings:
+ iter792.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.triggers is not None:
oprot.writeFieldBegin('triggers', TType.LIST, 4)
oprot.writeListBegin(TType.STRUCT, len(self.triggers))
- for iter772 in self.triggers:
- iter772.write(oprot)
+ for iter793 in self.triggers:
+ iter793.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.poolTriggers is not None:
oprot.writeFieldBegin('poolTriggers', TType.LIST, 5)
oprot.writeListBegin(TType.STRUCT, len(self.poolTriggers))
- for iter773 in self.poolTriggers:
- iter773.write(oprot)
+ for iter794 in self.poolTriggers:
+ iter794.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -18751,11 +19140,11 @@ class WMGetAllResourcePlanResponse:
if fid == 1:
if ftype == TType.LIST:
self.resourcePlans = []
- (_etype777, _size774) = iprot.readListBegin()
- for _i778 in xrange(_size774):
- _elem779 = WMResourcePlan()
- _elem779.read(iprot)
- self.resourcePlans.append(_elem779)
+ (_etype798, _size795) = iprot.readListBegin()
+ for _i799 in xrange(_size795):
+ _elem800 = WMResourcePlan()
+ _elem800.read(iprot)
+ self.resourcePlans.append(_elem800)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -18772,8 +19161,8 @@ class WMGetAllResourcePlanResponse:
if self.resourcePlans is not None:
oprot.writeFieldBegin('resourcePlans', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.resourcePlans))
- for iter780 in self.resourcePlans:
- iter780.write(oprot)
+ for iter801 in self.resourcePlans:
+ iter801.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -19077,20 +19466,20 @@ class WMValidateResourcePlanResponse:
if fid == 1:
if ftype == TType.LIST:
self.errors = []
- (_etype784, _size781) = iprot.readListBegin()
- for _i785 in xrange(_size781):
- _elem786 = iprot.readString()
- self.errors.append(_elem786)
+ (_etype805, _size802) = iprot.readListBegin()
+ for _i806 in xrange(_size802):
+ _elem807 = iprot.readString()
+ self.errors.append(_elem807)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.LIST:
self.warnings = []
- (_etype790, _size787) = iprot.readListBegin()
- for _i791 in xrange(_size787):
- _elem792 = iprot.readString()
- self.warnings.append(_elem792)
+ (_etype811, _size808) = iprot.readListBegin()
+ for _i812 in xrange(_size808):
+ _elem813 = iprot.readString()
+ self.warnings.append(_elem813)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -19107,15 +19496,15 @@ class WMValidateResourcePlanResponse:
if self.errors is not None:
oprot.writeFieldBegin('errors', TType.LIST, 1)
oprot.writeListBegin(TType.STRING, len(self.errors))
- for iter793 in self.errors:
- oprot.writeString(iter793)
+ for iter814 in self.errors:
+ oprot.writeString(iter814)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.warnings is not None:
oprot.writeFieldBegin('warnings', TType.LIST, 2)
oprot.writeListBegin(TType.STRING, len(self.warnings))
- for iter794 in self.warnings:
- oprot.writeString(iter794)
+ for iter815 in self.warnings:
+ oprot.writeString(iter815)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -19692,11 +20081,11 @@ class WMGetTriggersForResourePlanResponse:
if fid == 1:
if ftype == TType.LIST:
self.triggers = []
- (_etype798, _size795) = iprot.readListBegin()
- for _i799 in xrange(_size795):
- _elem800 = WMTrigger()
- _elem800.read(iprot)
- self.triggers.append(_elem800)
+ (_etype819, _size816) = iprot.readListBegin()
+ for _i820 in xrange(_size816):
+ _elem821 = WMTrigger()
+ _elem821.read(iprot)
+ self.triggers.append(_elem821)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -19713,8 +20102,8 @@ class WMGetTriggersForResourePlanResponse:
if self.triggers is not None:
oprot.writeFieldBegin('triggers', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.triggers))
- for iter801 in self.triggers:
- iter801.write(oprot)
+ for iter822 in self.triggers:
+ iter822.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -20898,11 +21287,11 @@ class SchemaVersion:
elif fid == 4:
if ftype == TType.LIST:
self.cols = []
- (_etype805, _size802) = iprot.readListBegin()
- for _i806 in xrange(_size802):
- _elem807 = FieldSchema()
- _elem807.read(iprot)
- self.cols.append(_elem807)
+ (_etype826, _size823) = iprot.readListBegin()
+ for _i827 in xrange(_size823):
+ _elem828 = FieldSchema()
+ _elem828.read(iprot)
+ self.cols.append(_elem828)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -20962,8 +21351,8 @@ class SchemaVersion:
if self.cols is not None:
oprot.writeFieldBegin('cols', TType.LIST, 4)
oprot.writeListBegin(TType.STRUCT, len(self.cols))
- for iter808 in self.cols:
- iter808.write(oprot)
+ for iter829 in self.cols:
+ iter829.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.state is not None:
@@ -21218,11 +21607,11 @@ class FindSchemasByColsResp:
if fid == 1:
if ftype == TType.LIST:
self.schemaVersions = []
- (_etype812, _size809) = iprot.readListBegin()
- for _i813 in xrange(_size809):
- _elem814 = SchemaVersionDescriptor()
- _elem814.read(iprot)
- self.schemaVersions.append(_elem814)
+ (_etype833, _size830) = iprot.readListBegin()
+ for _i834 in xrange(_size830):
+ _elem835 = SchemaVersionDescriptor()
+ _elem835.read(iprot)
+ self.schemaVersions.append(_elem835)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -21239,8 +21628,8 @@ class FindSchemasByColsResp:
if self.schemaVersions is not None:
oprot.writeFieldBegin('schemaVersions', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.schemaVersions))
- for iter815 in self.schemaVersions:
- iter815.write(oprot)
+ for iter836 in self.schemaVersions:
+ iter836.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index fc640d0..0348ff2 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -2582,10 +2582,12 @@ class CommitTxnRequest
include ::Thrift::Struct, ::Thrift::Struct_Union
TXNID = 1
REPLPOLICY = 2
+ WRITEEVENTINFOS = 3
FIELDS = {
TXNID => {:type => ::Thrift::Types::I64, :name => 'txnid'},
- REPLPOLICY => {:type => ::Thrift::Types::STRING, :name => 'replPolicy', :optional => true}
+ REPLPOLICY => {:type => ::Thrift::Types::STRING, :name => 'replPolicy', :optional => true},
+ WRITEEVENTINFOS => {:type => ::Thrift::Types::LIST, :name => 'writeEventInfos', :element => {:type => ::Thrift::Types::STRUCT, :class => ::WriteEventInfo}, :optional => true}
}
def struct_fields; FIELDS; end
@@ -2597,6 +2599,38 @@ class CommitTxnRequest
::Thrift::Struct.generate_accessors self
end
+class WriteEventInfo
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ WRITEID = 1
+ DATABASE = 2
+ TABLE = 3
+ FILES = 4
+ PARTITION = 5
+ TABLEOBJ = 6
+ PARTITIONOBJ = 7
+
+ FIELDS = {
+ WRITEID => {:type => ::Thrift::Types::I64, :name => 'writeId'},
+ DATABASE => {:type => ::Thrift::Types::STRING, :name => 'database'},
+ TABLE => {:type => ::Thrift::Types::STRING, :name => 'table'},
+ FILES => {:type => ::Thrift::Types::STRING, :name => 'files'},
+ PARTITION => {:type => ::Thrift::Types::STRING, :name => 'partition', :optional => true},
+ TABLEOBJ => {:type => ::Thrift::Types::STRING, :name => 'tableObj', :optional => true},
+ PARTITIONOBJ => {:type => ::Thrift::Types::STRING, :name => 'partitionObj', :optional => true}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field writeId is unset!') unless @writeId
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field database is unset!') unless @database
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field table is unset!') unless @table
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field files is unset!') unless @files
+ end
+
+ ::Thrift::Struct.generate_accessors self
+end
+
class ReplTblWriteIdStateRequest
include ::Thrift::Struct, ::Thrift::Struct_Union
VALIDWRITEIDLIST = 1
@@ -3395,11 +3429,13 @@ class InsertEventRequestData
REPLACE = 1
FILESADDED = 2
FILESADDEDCHECKSUM = 3
+ SUBDIRECTORYLIST = 4
FIELDS = {
REPLACE => {:type => ::Thrift::Types::BOOL, :name => 'replace', :optional => true},
FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', :element => {:type => ::Thrift::Types::STRING}},
- FILESADDEDCHECKSUM => {:type => ::Thrift::Types::LIST, :name => 'filesAddedChecksum', :element => {:type => ::Thrift::Types::STRING}, :optional => true}
+ FILESADDEDCHECKSUM => {:type => ::Thrift::Types::LIST, :name => 'filesAddedChecksum', :element => {:type => ::Thrift::Types::STRING}, :optional => true},
+ SUBDIRECTORYLIST => {:type => ::Thrift::Types::LIST, :name => 'subDirectoryList', :element => {:type => ::Thrift::Types::STRING}, :optional => true}
}
def struct_fields; FIELDS; end
@@ -3477,6 +3513,52 @@ class FireEventResponse
::Thrift::Struct.generate_accessors self
end
+class WriteNotificationLogRequest
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ TXNID = 1
+ WRITEID = 2
+ DB = 3
+ TABLE = 4
+ FILEINFO = 5
+ PARTITIONVALS = 6
+
+ FIELDS = {
+ TXNID => {:type => ::Thrift::Types::I64, :name => 'txnId'},
+ WRITEID => {:type => ::Thrift::Types::I64, :name => 'writeId'},
+ DB => {:type => ::Thrift::Types::STRING, :name => 'db'},
+ TABLE => {:type => ::Thrift::Types::STRING, :name => 'table'},
+ FILEINFO => {:type => ::Thrift::Types::STRUCT, :name => 'fileInfo', :class => ::InsertEventRequestData},
+ PARTITIONVALS => {:type => ::Thrift::Types::LIST, :name => 'partitionVals', :element => {:type => ::Thrift::Types::STRING}, :optional => true}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field txnId is unset!') unless @txnId
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field writeId is unset!') unless @writeId
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field db is unset!') unless @db
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field table is unset!') unless @table
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field fileInfo is unset!') unless @fileInfo
+ end
+
+ ::Thrift::Struct.generate_accessors self
+end
+
+class WriteNotificationLogResponse
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+
+ FIELDS = {
+
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+end
+
class MetadataPpdResult
include ::Thrift::Struct, ::Thrift::Struct_Union
METADATA = 1
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
index bbf3f12..2bd958e 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
@@ -2751,6 +2751,21 @@ module ThriftHiveMetastore
return
end
+ def add_write_notification_log(rqst)
+ send_add_write_notification_log(rqst)
+ return recv_add_write_notification_log()
+ end
+
+ def send_add_write_notification_log(rqst)
+ send_message('add_write_notification_log', Add_write_notification_log_args, :rqst => rqst)
+ end
+
+ def recv_add_write_notification_log()
+ result = receive_message(Add_write_notification_log_result)
+ return result.success unless result.success.nil?
+ raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'add_write_notification_log failed: unknown result')
+ end
+
def cm_recycle(request)
send_cm_recycle(request)
return recv_cm_recycle()
@@ -5520,6 +5535,13 @@ module ThriftHiveMetastore
write_result(result, oprot, 'flushCache', seqid)
end
+ def process_add_write_notification_log(seqid, iprot, oprot)
+ args = read_args(iprot, Add_write_notification_log_args)
+ result = Add_write_notification_log_result.new()
+ result.success = @handler.add_write_notification_log(args.rqst)
+ write_result(result, oprot, 'add_write_notification_log', seqid)
+ end
+
def process_cm_recycle(seqid, iprot, oprot)
args = read_args(iprot, Cm_recycle_args)
result = Cm_recycle_result.new()
@@ -12220,6 +12242,38 @@ module ThriftHiveMetastore
::Thrift::Struct.generate_accessors self
end
+ class Add_write_notification_log_args
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ RQST = -1
+
+ FIELDS = {
+ RQST => {:type => ::Thrift::Types::STRUCT, :name => 'rqst', :class => ::WriteNotificationLogRequest}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
+ class Add_write_notification_log_result
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ SUCCESS = 0
+
+ FIELDS = {
+ SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::WriteNotificationLogResponse}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
class Cm_recycle_args
include ::Thrift::Struct, ::Thrift::Struct_Union
REQUEST = 1
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 1327fa2..c6c04b7 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -81,6 +81,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.TableName;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.events.AddForeignKeyEvent;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.StatsUpdateMode;
@@ -7169,6 +7170,55 @@ public class HiveMetaStore extends ThriftHiveMetastore {
@Override
public void commit_txn(CommitTxnRequest rqst) throws TException {
+ // in replication flow, the write notification log table will be updated here.
+ if (rqst.isSetWriteEventInfos()) {
+ long targetTxnId = getTxnHandler().getTargetTxnId(rqst.getReplPolicy(), rqst.getTxnid());
+ if (targetTxnId < 0) {
+ //looks like a retry
+ return;
+ }
+ for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
+ String[] filesAdded = ReplChangeManager.getListFromSeparatedString(writeEventInfo.getFiles());
+ List<String> partitionValue = null;
+ Partition ptnObj = null;
+ String root;
+ Table tbl = getTblObject(writeEventInfo.getDatabase(), writeEventInfo.getTable());
+
+ if (writeEventInfo.getPartition() != null && !writeEventInfo.getPartition().isEmpty()) {
+ partitionValue = Warehouse.getPartValuesFromPartName(writeEventInfo.getPartition());
+ ptnObj = getPartitionObj(writeEventInfo.getDatabase(), writeEventInfo.getTable(), partitionValue, tbl);
+ root = ptnObj.getSd().getLocation();
+ } else {
+ root = tbl.getSd().getLocation();
+ }
+
+ InsertEventRequestData insertData = new InsertEventRequestData();
+ insertData.setReplace(true);
+
+ // The files in the commit txn message during load will have files with path corresponding to source
+ // warehouse. Need to transform them to target warehouse using table or partition object location.
+ for (String file : filesAdded) {
+ String[] decodedPath = ReplChangeManager.decodeFileUri(file);
+ String name = (new Path(decodedPath[0])).getName();
+ Path newPath = FileUtils.getTransformedPath(name, decodedPath[3], root);
+ insertData.addToFilesAdded(newPath.toUri().toString());
+ insertData.addToSubDirectoryList(decodedPath[3]);
+ try {
+ insertData.addToFilesAddedChecksum(ReplChangeManager.checksumFor(newPath, newPath.getFileSystem(conf)));
+ } catch (IOException e) {
+ LOG.error("failed to get checksum for the file " + newPath + " with error: " + e.getMessage());
+ throw new TException(e.getMessage());
+ }
+ }
+
+ WriteNotificationLogRequest wnRqst = new WriteNotificationLogRequest(targetTxnId,
+ writeEventInfo.getWriteId(), writeEventInfo.getDatabase(), writeEventInfo.getTable(), insertData);
+ if (partitionValue != null) {
+ wnRqst.setPartitionVals(partitionValue);
+ }
+ addTxnWriteNotificationLog(tbl, ptnObj, wnRqst);
+ }
+ }
getTxnHandler().commitTxn(rqst);
if (listeners != null && !listeners.isEmpty()) {
MetaStoreListenerNotifier.notifyEvent(listeners, EventType.COMMIT_TXN,
@@ -7198,6 +7248,42 @@ public class HiveMetaStore extends ThriftHiveMetastore {
return response;
}
+ private void addTxnWriteNotificationLog(Table tableObj, Partition ptnObj, WriteNotificationLogRequest rqst)
+ throws MetaException {
+ String partition = ""; //Empty string is an invalid partition name. Can be used for non partitioned table.
+ if (ptnObj != null) {
+ partition = Warehouse.makePartName(tableObj.getPartitionKeys(), rqst.getPartitionVals());
+ }
+ AcidWriteEvent event = new AcidWriteEvent(partition, tableObj, ptnObj, rqst);
+ getTxnHandler().addWriteNotificationLog(event);
+ if (listeners != null && !listeners.isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(listeners, EventType.ACID_WRITE, event);
+ }
+ }
+
+ private Table getTblObject(String db, String table) throws MetaException, NoSuchObjectException {
+ GetTableRequest req = new GetTableRequest(db, table);
+ req.setCapabilities(new ClientCapabilities(Lists.newArrayList(ClientCapability.TEST_CAPABILITY)));
+ return get_table_req(req).getTable();
+ }
+
+ private Partition getPartitionObj(String db, String table, List<String> partitionVals, Table tableObj)
+ throws MetaException, NoSuchObjectException {
+ if (tableObj.isSetPartitionKeys() && !tableObj.getPartitionKeys().isEmpty()) {
+ return get_partition(db, table, partitionVals);
+ }
+ return null;
+ }
+
+ @Override
+ public WriteNotificationLogResponse add_write_notification_log(WriteNotificationLogRequest rqst)
+ throws MetaException, NoSuchObjectException {
+ Table tableObj = getTblObject(rqst.getDb(), rqst.getTable());
+ Partition ptnObj = getPartitionObj(rqst.getDb(), rqst.getTable(), rqst.getPartitionVals(), tableObj);
+ addTxnWriteNotificationLog(tableObj, ptnObj, rqst);
+ return new WriteNotificationLogResponse();
+ }
+
@Override
public LockResponse lock(LockRequest rqst) throws TException {
return getTxnHandler().lock(rqst);
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index da41e6e..bfd7141 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2506,10 +2506,8 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
}
@Override
- public void replCommitTxn(long srcTxnId, String replPolicy)
+ public void replCommitTxn(CommitTxnRequest rqst)
throws NoSuchTxnException, TxnAbortedException, TException {
- CommitTxnRequest rqst = new CommitTxnRequest(srcTxnId);
- rqst.setReplPolicy(replPolicy);
client.commit_txn(rqst);
}
@@ -2756,6 +2754,12 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
return client.fire_listener_event(rqst);
}
+ @InterfaceAudience.LimitedPrivate({"Apache Hive, HCatalog"})
+ @Override
+ public void addWriteNotificationLog(WriteNotificationLogRequest rqst) throws TException {
+ client.add_write_notification_log(rqst);
+ }
+
/**
* Creates a synchronized wrapper for any {@link IMetaStoreClient}.
* This may be used by multi-threaded applications until we have
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index bc09076..b5d147b 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.CmRecycleRequest;
import org.apache.hadoop.hive.metastore.api.CmRecycleResponse;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
@@ -125,6 +126,7 @@ import org.apache.hadoop.hive.metastore.api.WMPool;
import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.ObjectPair;
import org.apache.thrift.TException;
@@ -2871,8 +2873,8 @@ public interface IMetaStoreClient {
/**
* Commit a transaction. This will also unlock any locks associated with
* this transaction.
- * @param srcTxnid id of transaction at source which is committed and to be replicated.
- * @param replPolicy the replication policy to identify the source cluster
+ * @param rqst Information containing the txn info and write event information
+ * of transaction at source which is committed and to be replicated
* @throws NoSuchTxnException if the requested transaction does not exist.
* This can result fro the transaction having timed out and been deleted by
* the compactor.
@@ -2880,7 +2882,7 @@ public interface IMetaStoreClient {
* aborted. This can result from the transaction timing out.
* @throws TException
*/
- void replCommitTxn(long srcTxnid, String replPolicy)
+ void replCommitTxn(CommitTxnRequest rqst)
throws NoSuchTxnException, TxnAbortedException, TException;
/**
@@ -3193,6 +3195,14 @@ public interface IMetaStoreClient {
@InterfaceAudience.LimitedPrivate({"Apache Hive, HCatalog"})
FireEventResponse fireListenerEvent(FireEventRequest request) throws TException;
+ /**
+ * Add a event related to write operations in an ACID table.
+ * @param rqst message containing information for acid write operation.
+ * @throws TException
+ */
+ @InterfaceAudience.LimitedPrivate({"Apache Hive, HCatalog"})
+ void addWriteNotificationLog(WriteNotificationLogRequest rqst) throws TException;
+
class IncompatibleMetastoreException extends MetaException {
IncompatibleMetastoreException(String message) {
super(message);
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
index e0e65cf..de226bf 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreEventListener.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
import java.sql.Connection;
@@ -282,6 +283,17 @@ public abstract class MetaStoreEventListener implements Configurable {
throws MetaException {
}
+ /**
+ * This will be called to perform acid write operation.
+ * @param acidWriteEvent event to be processed
+ * @param dbConn jdbc connection to remote meta store db.
+ * @param sqlGenerator helper class to generate db specific sql string.
+ * @throws MetaException
+ */
+ public void onAcidWrite(AcidWriteEvent acidWriteEvent, Connection dbConn, SQLGenerator sqlGenerator)
+ throws MetaException {
+ }
+
@Override
public Configuration getConf() {
return this.conf;
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
index 3cf8314..c296f57 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreListenerNotifier.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
import java.sql.Connection;
import java.util.List;
@@ -221,6 +222,8 @@ public class MetaStoreListenerNotifier {
(listener, event) -> listener.onAbortTxn((AbortTxnEvent) event, null, null))
.put(EventType.ALLOC_WRITE_ID,
(listener, event) -> listener.onAllocWriteId((AllocWriteIdEvent) event, null, null))
+ .put(EventType.ACID_WRITE,
+ (listener, event) -> listener.onAcidWrite((AcidWriteEvent) event, null, null))
.build()
);
@@ -241,6 +244,9 @@ public class MetaStoreListenerNotifier {
.put(EventType.ALLOC_WRITE_ID,
(listener, event, dbConn, sqlGenerator) ->
listener.onAllocWriteId((AllocWriteIdEvent) event, dbConn, sqlGenerator))
+ .put(EventType.ACID_WRITE,
+ (listener, event, dbConn, sqlGenerator) ->
+ listener.onAcidWrite((AcidWriteEvent) event, dbConn, sqlGenerator))
.build()
);
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 7490243..8721022 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -159,6 +159,7 @@ import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMResourcePlanStatus;
import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
@@ -206,6 +207,7 @@ import org.apache.hadoop.hive.metastore.model.MWMPool;
import org.apache.hadoop.hive.metastore.model.MWMResourcePlan;
import org.apache.hadoop.hive.metastore.model.MWMResourcePlan.Status;
import org.apache.hadoop.hive.metastore.model.MWMTrigger;
+import org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree;
import org.apache.hadoop.hive.metastore.parser.ExpressionTree.FilterBuilder;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
@@ -9611,6 +9613,64 @@ public class ObjectStore implements RawStore, Configurable {
}
}
+ @Override
+ public void cleanWriteNotificationEvents(int olderThan) {
+ boolean commited = false;
+ Query query = null;
+ try {
+ openTransaction();
+ long tmp = System.currentTimeMillis() / 1000 - olderThan;
+ int tooOld = (tmp > Integer.MAX_VALUE) ? 0 : (int) tmp;
+ query = pm.newQuery(MTxnWriteNotificationLog.class, "eventTime < tooOld");
+ query.declareParameters("java.lang.Integer tooOld");
+ Collection<MTxnWriteNotificationLog> toBeRemoved = (Collection) query.execute(tooOld);
+ if (CollectionUtils.isNotEmpty(toBeRemoved)) {
+ pm.deletePersistentAll(toBeRemoved);
+ }
+ commited = commitTransaction();
+ } finally {
+ rollbackAndCleanup(commited, query);
+ }
+ }
+
+ @Override
+ public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException {
+ List<WriteEventInfo> writeEventInfoList = null;
+ boolean commited = false;
+ Query query = null;
+ try {
+ openTransaction();
+ List<String> parameterVals = new ArrayList<>();
+ StringBuilder filterBuilder = new StringBuilder(" txnId == " + Long.toString(txnId));
+ if (dbName != null && !"*".equals(dbName)) { // * means get all database, so no need to add filter
+ appendSimpleCondition(filterBuilder, "database", new String[]{dbName}, parameterVals);
+ }
+ if (tableName != null && !"*".equals(tableName)) {
+ appendSimpleCondition(filterBuilder, "table", new String[]{tableName}, parameterVals);
+ }
+ query = pm.newQuery(MTxnWriteNotificationLog.class, filterBuilder.toString());
+ query.setOrdering("database,table ascending");
+ List<MTxnWriteNotificationLog> mplans = (List<MTxnWriteNotificationLog>)query.executeWithArray(
+ parameterVals.toArray(new String[parameterVals.size()]));
+ pm.retrieveAll(mplans);
+ commited = commitTransaction();
+ if (mplans != null && mplans.size() > 0) {
+ writeEventInfoList = Lists.newArrayList();
+ for (MTxnWriteNotificationLog mplan : mplans) {
+ WriteEventInfo writeEventInfo = new WriteEventInfo(mplan.getWriteId(), mplan.getDatabase(),
+ mplan.getTable(), mplan.getFiles());
+ writeEventInfo.setPartition(mplan.getPartition());
+ writeEventInfo.setPartitionObj(mplan.getPartObject());
+ writeEventInfo.setTableObj(mplan.getTableObject());
+ writeEventInfoList.add(writeEventInfo);
+ }
+ }
+ } finally {
+ rollbackAndCleanup(commited, query);
+ }
+ return writeEventInfoList;
+ }
+
private void prepareQuotes() throws SQLException {
if (dbType == DatabaseProduct.MYSQL) {
assert pm.currentTransaction().isActive();
http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
index d019941..73a518d 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.metastore.api.CreationMetadata;
import org.apache.hadoop.hive.metastore.api.ISchemaName;
import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
@@ -1665,4 +1666,17 @@ public interface RawStore extends Configurable {
Map<String, List<String>> getPartitionColsWithStats(String catName, String dbName,
String tableName) throws MetaException, NoSuchObjectException;
+ /**
+ * Remove older notification events.
+ * @param olderThan Remove any events older than a given number of seconds
+ */
+ void cleanWriteNotificationEvents(int olderThan);
+
+ /**
+ * Get all write events for a specific transaction .
+ * @param txnId get all the events done by this transaction
+ * @param dbName the name of db for which dump is being taken
+ * @param tableName the name of the table for which the dump is being taken
+ */
+ List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException;
}