You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by we...@apache.org on 2017/05/16 22:53:13 UTC
[08/18] hive git commit: HIVE-14879 : integrate MM tables into ACID:
replace MM metastore calls and structures with ACID ones (Wei Zheng)
http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 3d20125..42aaa9c 100644
--- a/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -3137,8 +3137,6 @@ class Table:
- privileges
- temporary
- rewriteEnabled
- - mmNextWriteId
- - mmWatermarkWriteId
"""
thrift_spec = (
@@ -3158,11 +3156,9 @@ class Table:
(13, TType.STRUCT, 'privileges', (PrincipalPrivilegeSet, PrincipalPrivilegeSet.thrift_spec), None, ), # 13
(14, TType.BOOL, 'temporary', None, False, ), # 14
(15, TType.BOOL, 'rewriteEnabled', None, None, ), # 15
- (16, TType.I64, 'mmNextWriteId', None, None, ), # 16
- (17, TType.I64, 'mmWatermarkWriteId', None, None, ), # 17
)
- def __init__(self, tableName=None, dbName=None, owner=None, createTime=None, lastAccessTime=None, retention=None, sd=None, partitionKeys=None, parameters=None, viewOriginalText=None, viewExpandedText=None, tableType=None, privileges=None, temporary=thrift_spec[14][4], rewriteEnabled=None, mmNextWriteId=None, mmWatermarkWriteId=None,):
+ def __init__(self, tableName=None, dbName=None, owner=None, createTime=None, lastAccessTime=None, retention=None, sd=None, partitionKeys=None, parameters=None, viewOriginalText=None, viewExpandedText=None, tableType=None, privileges=None, temporary=thrift_spec[14][4], rewriteEnabled=None,):
self.tableName = tableName
self.dbName = dbName
self.owner = owner
@@ -3178,8 +3174,6 @@ class Table:
self.privileges = privileges
self.temporary = temporary
self.rewriteEnabled = rewriteEnabled
- self.mmNextWriteId = mmNextWriteId
- self.mmWatermarkWriteId = mmWatermarkWriteId
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -3279,16 +3273,6 @@ class Table:
self.rewriteEnabled = iprot.readBool()
else:
iprot.skip(ftype)
- elif fid == 16:
- if ftype == TType.I64:
- self.mmNextWriteId = iprot.readI64()
- else:
- iprot.skip(ftype)
- elif fid == 17:
- if ftype == TType.I64:
- self.mmWatermarkWriteId = iprot.readI64()
- else:
- iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -3366,14 +3350,6 @@ class Table:
oprot.writeFieldBegin('rewriteEnabled', TType.BOOL, 15)
oprot.writeBool(self.rewriteEnabled)
oprot.writeFieldEnd()
- if self.mmNextWriteId is not None:
- oprot.writeFieldBegin('mmNextWriteId', TType.I64, 16)
- oprot.writeI64(self.mmNextWriteId)
- oprot.writeFieldEnd()
- if self.mmWatermarkWriteId is not None:
- oprot.writeFieldBegin('mmWatermarkWriteId', TType.I64, 17)
- oprot.writeI64(self.mmWatermarkWriteId)
- oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -3398,8 +3374,6 @@ class Table:
value = (value * 31) ^ hash(self.privileges)
value = (value * 31) ^ hash(self.temporary)
value = (value * 31) ^ hash(self.rewriteEnabled)
- value = (value * 31) ^ hash(self.mmNextWriteId)
- value = (value * 31) ^ hash(self.mmWatermarkWriteId)
return value
def __repr__(self):
@@ -12455,22 +12429,19 @@ class CacheFileMetadataRequest:
def __ne__(self, other):
return not (self == other)
-class GetNextWriteIdRequest:
+class GetAllFunctionsResponse:
"""
Attributes:
- - dbName
- - tblName
+ - functions
"""
thrift_spec = (
None, # 0
- (1, TType.STRING, 'dbName', None, None, ), # 1
- (2, TType.STRING, 'tblName', None, None, ), # 2
+ (1, TType.LIST, 'functions', (TType.STRUCT,(Function, Function.thrift_spec)), None, ), # 1
)
- def __init__(self, dbName=None, tblName=None,):
- self.dbName = dbName
- self.tblName = tblName
+ def __init__(self, functions=None,):
+ self.functions = functions
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -12482,13 +12453,14 @@ class GetNextWriteIdRequest:
if ftype == TType.STOP:
break
if fid == 1:
- if ftype == TType.STRING:
- self.dbName = iprot.readString()
- else:
- iprot.skip(ftype)
- elif fid == 2:
- if ftype == TType.STRING:
- self.tblName = iprot.readString()
+ if ftype == TType.LIST:
+ self.functions = []
+ (_etype569, _size566) = iprot.readListBegin()
+ for _i570 in xrange(_size566):
+ _elem571 = Function()
+ _elem571.read(iprot)
+ self.functions.append(_elem571)
+ iprot.readListEnd()
else:
iprot.skip(ftype)
else:
@@ -12500,30 +12472,24 @@ class GetNextWriteIdRequest:
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
- oprot.writeStructBegin('GetNextWriteIdRequest')
- if self.dbName is not None:
- oprot.writeFieldBegin('dbName', TType.STRING, 1)
- oprot.writeString(self.dbName)
- oprot.writeFieldEnd()
- if self.tblName is not None:
- oprot.writeFieldBegin('tblName', TType.STRING, 2)
- oprot.writeString(self.tblName)
+ oprot.writeStructBegin('GetAllFunctionsResponse')
+ if self.functions is not None:
+ oprot.writeFieldBegin('functions', TType.LIST, 1)
+ oprot.writeListBegin(TType.STRUCT, len(self.functions))
+ for iter572 in self.functions:
+ iter572.write(oprot)
+ oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
- if self.dbName is None:
- raise TProtocol.TProtocolException(message='Required field dbName is unset!')
- if self.tblName is None:
- raise TProtocol.TProtocolException(message='Required field tblName is unset!')
return
def __hash__(self):
value = 17
- value = (value * 31) ^ hash(self.dbName)
- value = (value * 31) ^ hash(self.tblName)
+ value = (value * 31) ^ hash(self.functions)
return value
def __repr__(self):
@@ -12537,19 +12503,19 @@ class GetNextWriteIdRequest:
def __ne__(self, other):
return not (self == other)
-class GetNextWriteIdResult:
+class ClientCapabilities:
"""
Attributes:
- - writeId
+ - values
"""
thrift_spec = (
None, # 0
- (1, TType.I64, 'writeId', None, None, ), # 1
+ (1, TType.LIST, 'values', (TType.I32,None), None, ), # 1
)
- def __init__(self, writeId=None,):
- self.writeId = writeId
+ def __init__(self, values=None,):
+ self.values = values
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -12561,8 +12527,13 @@ class GetNextWriteIdResult:
if ftype == TType.STOP:
break
if fid == 1:
- if ftype == TType.I64:
- self.writeId = iprot.readI64()
+ if ftype == TType.LIST:
+ self.values = []
+ (_etype576, _size573) = iprot.readListBegin()
+ for _i577 in xrange(_size573):
+ _elem578 = iprot.readI32()
+ self.values.append(_elem578)
+ iprot.readListEnd()
else:
iprot.skip(ftype)
else:
@@ -12574,23 +12545,26 @@ class GetNextWriteIdResult:
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
- oprot.writeStructBegin('GetNextWriteIdResult')
- if self.writeId is not None:
- oprot.writeFieldBegin('writeId', TType.I64, 1)
- oprot.writeI64(self.writeId)
+ oprot.writeStructBegin('ClientCapabilities')
+ if self.values is not None:
+ oprot.writeFieldBegin('values', TType.LIST, 1)
+ oprot.writeListBegin(TType.I32, len(self.values))
+ for iter579 in self.values:
+ oprot.writeI32(iter579)
+ oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
- if self.writeId is None:
- raise TProtocol.TProtocolException(message='Required field writeId is unset!')
+ if self.values is None:
+ raise TProtocol.TProtocolException(message='Required field values is unset!')
return
def __hash__(self):
value = 17
- value = (value * 31) ^ hash(self.writeId)
+ value = (value * 31) ^ hash(self.values)
return value
def __repr__(self):
@@ -12604,28 +12578,25 @@ class GetNextWriteIdResult:
def __ne__(self, other):
return not (self == other)
-class FinalizeWriteIdRequest:
+class GetTableRequest:
"""
Attributes:
- dbName
- tblName
- - writeId
- - commit
+ - capabilities
"""
thrift_spec = (
None, # 0
(1, TType.STRING, 'dbName', None, None, ), # 1
(2, TType.STRING, 'tblName', None, None, ), # 2
- (3, TType.I64, 'writeId', None, None, ), # 3
- (4, TType.BOOL, 'commit', None, None, ), # 4
+ (3, TType.STRUCT, 'capabilities', (ClientCapabilities, ClientCapabilities.thrift_spec), None, ), # 3
)
- def __init__(self, dbName=None, tblName=None, writeId=None, commit=None,):
+ def __init__(self, dbName=None, tblName=None, capabilities=None,):
self.dbName = dbName
self.tblName = tblName
- self.writeId = writeId
- self.commit = commit
+ self.capabilities = capabilities
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -12647,13 +12618,9 @@ class FinalizeWriteIdRequest:
else:
iprot.skip(ftype)
elif fid == 3:
- if ftype == TType.I64:
- self.writeId = iprot.readI64()
- else:
- iprot.skip(ftype)
- elif fid == 4:
- if ftype == TType.BOOL:
- self.commit = iprot.readBool()
+ if ftype == TType.STRUCT:
+ self.capabilities = ClientCapabilities()
+ self.capabilities.read(iprot)
else:
iprot.skip(ftype)
else:
@@ -12665,7 +12632,7 @@ class FinalizeWriteIdRequest:
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
- oprot.writeStructBegin('FinalizeWriteIdRequest')
+ oprot.writeStructBegin('GetTableRequest')
if self.dbName is not None:
oprot.writeFieldBegin('dbName', TType.STRING, 1)
oprot.writeString(self.dbName)
@@ -12674,13 +12641,9 @@ class FinalizeWriteIdRequest:
oprot.writeFieldBegin('tblName', TType.STRING, 2)
oprot.writeString(self.tblName)
oprot.writeFieldEnd()
- if self.writeId is not None:
- oprot.writeFieldBegin('writeId', TType.I64, 3)
- oprot.writeI64(self.writeId)
- oprot.writeFieldEnd()
- if self.commit is not None:
- oprot.writeFieldBegin('commit', TType.BOOL, 4)
- oprot.writeBool(self.commit)
+ if self.capabilities is not None:
+ oprot.writeFieldBegin('capabilities', TType.STRUCT, 3)
+ self.capabilities.write(oprot)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -12690,10 +12653,6 @@ class FinalizeWriteIdRequest:
raise TProtocol.TProtocolException(message='Required field dbName is unset!')
if self.tblName is None:
raise TProtocol.TProtocolException(message='Required field tblName is unset!')
- if self.writeId is None:
- raise TProtocol.TProtocolException(message='Required field writeId is unset!')
- if self.commit is None:
- raise TProtocol.TProtocolException(message='Required field commit is unset!')
return
@@ -12701,8 +12660,7 @@ class FinalizeWriteIdRequest:
value = 17
value = (value * 31) ^ hash(self.dbName)
value = (value * 31) ^ hash(self.tblName)
- value = (value * 31) ^ hash(self.writeId)
- value = (value * 31) ^ hash(self.commit)
+ value = (value * 31) ^ hash(self.capabilities)
return value
def __repr__(self):
@@ -12716,11 +12674,20 @@ class FinalizeWriteIdRequest:
def __ne__(self, other):
return not (self == other)
-class FinalizeWriteIdResult:
+class GetTableResult:
+ """
+ Attributes:
+ - table
+ """
thrift_spec = (
+ None, # 0
+ (1, TType.STRUCT, 'table', (Table, Table.thrift_spec), None, ), # 1
)
+ def __init__(self, table=None,):
+ self.table = table
+
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
@@ -12730,6 +12697,12 @@ class FinalizeWriteIdResult:
(fname, ftype, fid) = iprot.readFieldBegin()
if ftype == TType.STOP:
break
+ if fid == 1:
+ if ftype == TType.STRUCT:
+ self.table = Table()
+ self.table.read(iprot)
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -12739,16 +12712,23 @@ class FinalizeWriteIdResult:
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
- oprot.writeStructBegin('FinalizeWriteIdResult')
+ oprot.writeStructBegin('GetTableResult')
+ if self.table is not None:
+ oprot.writeFieldBegin('table', TType.STRUCT, 1)
+ self.table.write(oprot)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
+ if self.table is None:
+ raise TProtocol.TProtocolException(message='Required field table is unset!')
return
def __hash__(self):
value = 17
+ value = (value * 31) ^ hash(self.table)
return value
def __repr__(self):
@@ -12762,25 +12742,25 @@ class FinalizeWriteIdResult:
def __ne__(self, other):
return not (self == other)
-class HeartbeatWriteIdRequest:
+class GetTablesRequest:
"""
Attributes:
- dbName
- - tblName
- - writeId
+ - tblNames
+ - capabilities
"""
thrift_spec = (
None, # 0
(1, TType.STRING, 'dbName', None, None, ), # 1
- (2, TType.STRING, 'tblName', None, None, ), # 2
- (3, TType.I64, 'writeId', None, None, ), # 3
+ (2, TType.LIST, 'tblNames', (TType.STRING,None), None, ), # 2
+ (3, TType.STRUCT, 'capabilities', (ClientCapabilities, ClientCapabilities.thrift_spec), None, ), # 3
)
- def __init__(self, dbName=None, tblName=None, writeId=None,):
+ def __init__(self, dbName=None, tblNames=None, capabilities=None,):
self.dbName = dbName
- self.tblName = tblName
- self.writeId = writeId
+ self.tblNames = tblNames
+ self.capabilities = capabilities
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -12797,13 +12777,19 @@ class HeartbeatWriteIdRequest:
else:
iprot.skip(ftype)
elif fid == 2:
- if ftype == TType.STRING:
- self.tblName = iprot.readString()
+ if ftype == TType.LIST:
+ self.tblNames = []
+ (_etype583, _size580) = iprot.readListBegin()
+ for _i584 in xrange(_size580):
+ _elem585 = iprot.readString()
+ self.tblNames.append(_elem585)
+ iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 3:
- if ftype == TType.I64:
- self.writeId = iprot.readI64()
+ if ftype == TType.STRUCT:
+ self.capabilities = ClientCapabilities()
+ self.capabilities.read(iprot)
else:
iprot.skip(ftype)
else:
@@ -12815,18 +12801,21 @@ class HeartbeatWriteIdRequest:
if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
- oprot.writeStructBegin('HeartbeatWriteIdRequest')
+ oprot.writeStructBegin('GetTablesRequest')
if self.dbName is not None:
oprot.writeFieldBegin('dbName', TType.STRING, 1)
oprot.writeString(self.dbName)
oprot.writeFieldEnd()
- if self.tblName is not None:
- oprot.writeFieldBegin('tblName', TType.STRING, 2)
- oprot.writeString(self.tblName)
+ if self.tblNames is not None:
+ oprot.writeFieldBegin('tblNames', TType.LIST, 2)
+ oprot.writeListBegin(TType.STRING, len(self.tblNames))
+ for iter586 in self.tblNames:
+ oprot.writeString(iter586)
+ oprot.writeListEnd()
oprot.writeFieldEnd()
- if self.writeId is not None:
- oprot.writeFieldBegin('writeId', TType.I64, 3)
- oprot.writeI64(self.writeId)
+ if self.capabilities is not None:
+ oprot.writeFieldBegin('capabilities', TType.STRUCT, 3)
+ self.capabilities.write(oprot)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -12834,64 +12823,14 @@ class HeartbeatWriteIdRequest:
def validate(self):
if self.dbName is None:
raise TProtocol.TProtocolException(message='Required field dbName is unset!')
- if self.tblName is None:
- raise TProtocol.TProtocolException(message='Required field tblName is unset!')
- if self.writeId is None:
- raise TProtocol.TProtocolException(message='Required field writeId is unset!')
return
def __hash__(self):
value = 17
value = (value * 31) ^ hash(self.dbName)
- value = (value * 31) ^ hash(self.tblName)
- value = (value * 31) ^ hash(self.writeId)
- return value
-
- def __repr__(self):
- L = ['%s=%r' % (key, value)
- for key, value in self.__dict__.iteritems()]
- return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
-
- def __eq__(self, other):
- return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
-
- def __ne__(self, other):
- return not (self == other)
-
-class HeartbeatWriteIdResult:
-
- thrift_spec = (
- )
-
- def read(self, iprot):
- if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
- fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
- return
- iprot.readStructBegin()
- while True:
- (fname, ftype, fid) = iprot.readFieldBegin()
- if ftype == TType.STOP:
- break
- else:
- iprot.skip(ftype)
- iprot.readFieldEnd()
- iprot.readStructEnd()
-
- def write(self, oprot):
- if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
- oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
- return
- oprot.writeStructBegin('HeartbeatWriteIdResult')
- oprot.writeFieldStop()
- oprot.writeStructEnd()
-
- def validate(self):
- return
-
-
- def __hash__(self):
- value = 17
+ value = (value * 31) ^ hash(self.tblNames)
+ value = (value * 31) ^ hash(self.capabilities)
return value
def __repr__(self):
@@ -12905,291 +12844,19 @@ class HeartbeatWriteIdResult:
def __ne__(self, other):
return not (self == other)
-class GetValidWriteIdsRequest:
+class GetTablesResult:
"""
Attributes:
- - dbName
- - tblName
+ - tables
"""
thrift_spec = (
None, # 0
- (1, TType.STRING, 'dbName', None, None, ), # 1
- (2, TType.STRING, 'tblName', None, None, ), # 2
+ (1, TType.LIST, 'tables', (TType.STRUCT,(Table, Table.thrift_spec)), None, ), # 1
)
- def __init__(self, dbName=None, tblName=None,):
- self.dbName = dbName
- self.tblName = tblName
-
- def read(self, iprot):
- if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
- fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
- return
- iprot.readStructBegin()
- while True:
- (fname, ftype, fid) = iprot.readFieldBegin()
- if ftype == TType.STOP:
- break
- if fid == 1:
- if ftype == TType.STRING:
- self.dbName = iprot.readString()
- else:
- iprot.skip(ftype)
- elif fid == 2:
- if ftype == TType.STRING:
- self.tblName = iprot.readString()
- else:
- iprot.skip(ftype)
- else:
- iprot.skip(ftype)
- iprot.readFieldEnd()
- iprot.readStructEnd()
-
- def write(self, oprot):
- if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
- oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
- return
- oprot.writeStructBegin('GetValidWriteIdsRequest')
- if self.dbName is not None:
- oprot.writeFieldBegin('dbName', TType.STRING, 1)
- oprot.writeString(self.dbName)
- oprot.writeFieldEnd()
- if self.tblName is not None:
- oprot.writeFieldBegin('tblName', TType.STRING, 2)
- oprot.writeString(self.tblName)
- oprot.writeFieldEnd()
- oprot.writeFieldStop()
- oprot.writeStructEnd()
-
- def validate(self):
- if self.dbName is None:
- raise TProtocol.TProtocolException(message='Required field dbName is unset!')
- if self.tblName is None:
- raise TProtocol.TProtocolException(message='Required field tblName is unset!')
- return
-
-
- def __hash__(self):
- value = 17
- value = (value * 31) ^ hash(self.dbName)
- value = (value * 31) ^ hash(self.tblName)
- return value
-
- def __repr__(self):
- L = ['%s=%r' % (key, value)
- for key, value in self.__dict__.iteritems()]
- return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
-
- def __eq__(self, other):
- return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
-
- def __ne__(self, other):
- return not (self == other)
-
-class GetValidWriteIdsResult:
- """
- Attributes:
- - lowWatermarkId
- - highWatermarkId
- - areIdsValid
- - ids
- """
-
- thrift_spec = (
- None, # 0
- (1, TType.I64, 'lowWatermarkId', None, None, ), # 1
- (2, TType.I64, 'highWatermarkId', None, None, ), # 2
- (3, TType.BOOL, 'areIdsValid', None, None, ), # 3
- (4, TType.LIST, 'ids', (TType.I64,None), None, ), # 4
- )
-
- def __init__(self, lowWatermarkId=None, highWatermarkId=None, areIdsValid=None, ids=None,):
- self.lowWatermarkId = lowWatermarkId
- self.highWatermarkId = highWatermarkId
- self.areIdsValid = areIdsValid
- self.ids = ids
-
- def read(self, iprot):
- if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
- fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
- return
- iprot.readStructBegin()
- while True:
- (fname, ftype, fid) = iprot.readFieldBegin()
- if ftype == TType.STOP:
- break
- if fid == 1:
- if ftype == TType.I64:
- self.lowWatermarkId = iprot.readI64()
- else:
- iprot.skip(ftype)
- elif fid == 2:
- if ftype == TType.I64:
- self.highWatermarkId = iprot.readI64()
- else:
- iprot.skip(ftype)
- elif fid == 3:
- if ftype == TType.BOOL:
- self.areIdsValid = iprot.readBool()
- else:
- iprot.skip(ftype)
- elif fid == 4:
- if ftype == TType.LIST:
- self.ids = []
- (_etype569, _size566) = iprot.readListBegin()
- for _i570 in xrange(_size566):
- _elem571 = iprot.readI64()
- self.ids.append(_elem571)
- iprot.readListEnd()
- else:
- iprot.skip(ftype)
- else:
- iprot.skip(ftype)
- iprot.readFieldEnd()
- iprot.readStructEnd()
-
- def write(self, oprot):
- if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
- oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
- return
- oprot.writeStructBegin('GetValidWriteIdsResult')
- if self.lowWatermarkId is not None:
- oprot.writeFieldBegin('lowWatermarkId', TType.I64, 1)
- oprot.writeI64(self.lowWatermarkId)
- oprot.writeFieldEnd()
- if self.highWatermarkId is not None:
- oprot.writeFieldBegin('highWatermarkId', TType.I64, 2)
- oprot.writeI64(self.highWatermarkId)
- oprot.writeFieldEnd()
- if self.areIdsValid is not None:
- oprot.writeFieldBegin('areIdsValid', TType.BOOL, 3)
- oprot.writeBool(self.areIdsValid)
- oprot.writeFieldEnd()
- if self.ids is not None:
- oprot.writeFieldBegin('ids', TType.LIST, 4)
- oprot.writeListBegin(TType.I64, len(self.ids))
- for iter572 in self.ids:
- oprot.writeI64(iter572)
- oprot.writeListEnd()
- oprot.writeFieldEnd()
- oprot.writeFieldStop()
- oprot.writeStructEnd()
-
- def validate(self):
- if self.lowWatermarkId is None:
- raise TProtocol.TProtocolException(message='Required field lowWatermarkId is unset!')
- if self.highWatermarkId is None:
- raise TProtocol.TProtocolException(message='Required field highWatermarkId is unset!')
- return
-
-
- def __hash__(self):
- value = 17
- value = (value * 31) ^ hash(self.lowWatermarkId)
- value = (value * 31) ^ hash(self.highWatermarkId)
- value = (value * 31) ^ hash(self.areIdsValid)
- value = (value * 31) ^ hash(self.ids)
- return value
-
- def __repr__(self):
- L = ['%s=%r' % (key, value)
- for key, value in self.__dict__.iteritems()]
- return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
-
- def __eq__(self, other):
- return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
-
- def __ne__(self, other):
- return not (self == other)
-
-class GetAllFunctionsResponse:
- """
- Attributes:
- - functions
- """
-
- thrift_spec = (
- None, # 0
- (1, TType.LIST, 'functions', (TType.STRUCT,(Function, Function.thrift_spec)), None, ), # 1
- )
-
- def __init__(self, functions=None,):
- self.functions = functions
-
- def read(self, iprot):
- if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
- fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
- return
- iprot.readStructBegin()
- while True:
- (fname, ftype, fid) = iprot.readFieldBegin()
- if ftype == TType.STOP:
- break
- if fid == 1:
- if ftype == TType.LIST:
- self.functions = []
- (_etype576, _size573) = iprot.readListBegin()
- for _i577 in xrange(_size573):
- _elem578 = Function()
- _elem578.read(iprot)
- self.functions.append(_elem578)
- iprot.readListEnd()
- else:
- iprot.skip(ftype)
- else:
- iprot.skip(ftype)
- iprot.readFieldEnd()
- iprot.readStructEnd()
-
- def write(self, oprot):
- if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
- oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
- return
- oprot.writeStructBegin('GetAllFunctionsResponse')
- if self.functions is not None:
- oprot.writeFieldBegin('functions', TType.LIST, 1)
- oprot.writeListBegin(TType.STRUCT, len(self.functions))
- for iter579 in self.functions:
- iter579.write(oprot)
- oprot.writeListEnd()
- oprot.writeFieldEnd()
- oprot.writeFieldStop()
- oprot.writeStructEnd()
-
- def validate(self):
- return
-
-
- def __hash__(self):
- value = 17
- value = (value * 31) ^ hash(self.functions)
- return value
-
- def __repr__(self):
- L = ['%s=%r' % (key, value)
- for key, value in self.__dict__.iteritems()]
- return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
-
- def __eq__(self, other):
- return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
-
- def __ne__(self, other):
- return not (self == other)
-
-class ClientCapabilities:
- """
- Attributes:
- - values
- """
-
- thrift_spec = (
- None, # 0
- (1, TType.LIST, 'values', (TType.I32,None), None, ), # 1
- )
-
- def __init__(self, values=None,):
- self.values = values
+ def __init__(self, tables=None,):
+ self.tables = tables
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -13202,353 +12869,12 @@ class ClientCapabilities:
break
if fid == 1:
if ftype == TType.LIST:
- self.values = []
- (_etype583, _size580) = iprot.readListBegin()
- for _i584 in xrange(_size580):
- _elem585 = iprot.readI32()
- self.values.append(_elem585)
- iprot.readListEnd()
- else:
- iprot.skip(ftype)
- else:
- iprot.skip(ftype)
- iprot.readFieldEnd()
- iprot.readStructEnd()
-
- def write(self, oprot):
- if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
- oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
- return
- oprot.writeStructBegin('ClientCapabilities')
- if self.values is not None:
- oprot.writeFieldBegin('values', TType.LIST, 1)
- oprot.writeListBegin(TType.I32, len(self.values))
- for iter586 in self.values:
- oprot.writeI32(iter586)
- oprot.writeListEnd()
- oprot.writeFieldEnd()
- oprot.writeFieldStop()
- oprot.writeStructEnd()
-
- def validate(self):
- if self.values is None:
- raise TProtocol.TProtocolException(message='Required field values is unset!')
- return
-
-
- def __hash__(self):
- value = 17
- value = (value * 31) ^ hash(self.values)
- return value
-
- def __repr__(self):
- L = ['%s=%r' % (key, value)
- for key, value in self.__dict__.iteritems()]
- return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
-
- def __eq__(self, other):
- return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
-
- def __ne__(self, other):
- return not (self == other)
-
-class GetTableRequest:
- """
- Attributes:
- - dbName
- - tblName
- - capabilities
- """
-
- thrift_spec = (
- None, # 0
- (1, TType.STRING, 'dbName', None, None, ), # 1
- (2, TType.STRING, 'tblName', None, None, ), # 2
- (3, TType.STRUCT, 'capabilities', (ClientCapabilities, ClientCapabilities.thrift_spec), None, ), # 3
- )
-
- def __init__(self, dbName=None, tblName=None, capabilities=None,):
- self.dbName = dbName
- self.tblName = tblName
- self.capabilities = capabilities
-
- def read(self, iprot):
- if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
- fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
- return
- iprot.readStructBegin()
- while True:
- (fname, ftype, fid) = iprot.readFieldBegin()
- if ftype == TType.STOP:
- break
- if fid == 1:
- if ftype == TType.STRING:
- self.dbName = iprot.readString()
- else:
- iprot.skip(ftype)
- elif fid == 2:
- if ftype == TType.STRING:
- self.tblName = iprot.readString()
- else:
- iprot.skip(ftype)
- elif fid == 3:
- if ftype == TType.STRUCT:
- self.capabilities = ClientCapabilities()
- self.capabilities.read(iprot)
- else:
- iprot.skip(ftype)
- else:
- iprot.skip(ftype)
- iprot.readFieldEnd()
- iprot.readStructEnd()
-
- def write(self, oprot):
- if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
- oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
- return
- oprot.writeStructBegin('GetTableRequest')
- if self.dbName is not None:
- oprot.writeFieldBegin('dbName', TType.STRING, 1)
- oprot.writeString(self.dbName)
- oprot.writeFieldEnd()
- if self.tblName is not None:
- oprot.writeFieldBegin('tblName', TType.STRING, 2)
- oprot.writeString(self.tblName)
- oprot.writeFieldEnd()
- if self.capabilities is not None:
- oprot.writeFieldBegin('capabilities', TType.STRUCT, 3)
- self.capabilities.write(oprot)
- oprot.writeFieldEnd()
- oprot.writeFieldStop()
- oprot.writeStructEnd()
-
- def validate(self):
- if self.dbName is None:
- raise TProtocol.TProtocolException(message='Required field dbName is unset!')
- if self.tblName is None:
- raise TProtocol.TProtocolException(message='Required field tblName is unset!')
- return
-
-
- def __hash__(self):
- value = 17
- value = (value * 31) ^ hash(self.dbName)
- value = (value * 31) ^ hash(self.tblName)
- value = (value * 31) ^ hash(self.capabilities)
- return value
-
- def __repr__(self):
- L = ['%s=%r' % (key, value)
- for key, value in self.__dict__.iteritems()]
- return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
-
- def __eq__(self, other):
- return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
-
- def __ne__(self, other):
- return not (self == other)
-
-class GetTableResult:
- """
- Attributes:
- - table
- """
-
- thrift_spec = (
- None, # 0
- (1, TType.STRUCT, 'table', (Table, Table.thrift_spec), None, ), # 1
- )
-
- def __init__(self, table=None,):
- self.table = table
-
- def read(self, iprot):
- if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
- fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
- return
- iprot.readStructBegin()
- while True:
- (fname, ftype, fid) = iprot.readFieldBegin()
- if ftype == TType.STOP:
- break
- if fid == 1:
- if ftype == TType.STRUCT:
- self.table = Table()
- self.table.read(iprot)
- else:
- iprot.skip(ftype)
- else:
- iprot.skip(ftype)
- iprot.readFieldEnd()
- iprot.readStructEnd()
-
- def write(self, oprot):
- if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
- oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
- return
- oprot.writeStructBegin('GetTableResult')
- if self.table is not None:
- oprot.writeFieldBegin('table', TType.STRUCT, 1)
- self.table.write(oprot)
- oprot.writeFieldEnd()
- oprot.writeFieldStop()
- oprot.writeStructEnd()
-
- def validate(self):
- if self.table is None:
- raise TProtocol.TProtocolException(message='Required field table is unset!')
- return
-
-
- def __hash__(self):
- value = 17
- value = (value * 31) ^ hash(self.table)
- return value
-
- def __repr__(self):
- L = ['%s=%r' % (key, value)
- for key, value in self.__dict__.iteritems()]
- return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
-
- def __eq__(self, other):
- return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
-
- def __ne__(self, other):
- return not (self == other)
-
-class GetTablesRequest:
- """
- Attributes:
- - dbName
- - tblNames
- - capabilities
- """
-
- thrift_spec = (
- None, # 0
- (1, TType.STRING, 'dbName', None, None, ), # 1
- (2, TType.LIST, 'tblNames', (TType.STRING,None), None, ), # 2
- (3, TType.STRUCT, 'capabilities', (ClientCapabilities, ClientCapabilities.thrift_spec), None, ), # 3
- )
-
- def __init__(self, dbName=None, tblNames=None, capabilities=None,):
- self.dbName = dbName
- self.tblNames = tblNames
- self.capabilities = capabilities
-
- def read(self, iprot):
- if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
- fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
- return
- iprot.readStructBegin()
- while True:
- (fname, ftype, fid) = iprot.readFieldBegin()
- if ftype == TType.STOP:
- break
- if fid == 1:
- if ftype == TType.STRING:
- self.dbName = iprot.readString()
- else:
- iprot.skip(ftype)
- elif fid == 2:
- if ftype == TType.LIST:
- self.tblNames = []
+ self.tables = []
(_etype590, _size587) = iprot.readListBegin()
for _i591 in xrange(_size587):
- _elem592 = iprot.readString()
- self.tblNames.append(_elem592)
- iprot.readListEnd()
- else:
- iprot.skip(ftype)
- elif fid == 3:
- if ftype == TType.STRUCT:
- self.capabilities = ClientCapabilities()
- self.capabilities.read(iprot)
- else:
- iprot.skip(ftype)
- else:
- iprot.skip(ftype)
- iprot.readFieldEnd()
- iprot.readStructEnd()
-
- def write(self, oprot):
- if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
- oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
- return
- oprot.writeStructBegin('GetTablesRequest')
- if self.dbName is not None:
- oprot.writeFieldBegin('dbName', TType.STRING, 1)
- oprot.writeString(self.dbName)
- oprot.writeFieldEnd()
- if self.tblNames is not None:
- oprot.writeFieldBegin('tblNames', TType.LIST, 2)
- oprot.writeListBegin(TType.STRING, len(self.tblNames))
- for iter593 in self.tblNames:
- oprot.writeString(iter593)
- oprot.writeListEnd()
- oprot.writeFieldEnd()
- if self.capabilities is not None:
- oprot.writeFieldBegin('capabilities', TType.STRUCT, 3)
- self.capabilities.write(oprot)
- oprot.writeFieldEnd()
- oprot.writeFieldStop()
- oprot.writeStructEnd()
-
- def validate(self):
- if self.dbName is None:
- raise TProtocol.TProtocolException(message='Required field dbName is unset!')
- return
-
-
- def __hash__(self):
- value = 17
- value = (value * 31) ^ hash(self.dbName)
- value = (value * 31) ^ hash(self.tblNames)
- value = (value * 31) ^ hash(self.capabilities)
- return value
-
- def __repr__(self):
- L = ['%s=%r' % (key, value)
- for key, value in self.__dict__.iteritems()]
- return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
-
- def __eq__(self, other):
- return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
-
- def __ne__(self, other):
- return not (self == other)
-
-class GetTablesResult:
- """
- Attributes:
- - tables
- """
-
- thrift_spec = (
- None, # 0
- (1, TType.LIST, 'tables', (TType.STRUCT,(Table, Table.thrift_spec)), None, ), # 1
- )
-
- def __init__(self, tables=None,):
- self.tables = tables
-
- def read(self, iprot):
- if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
- fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
- return
- iprot.readStructBegin()
- while True:
- (fname, ftype, fid) = iprot.readFieldBegin()
- if ftype == TType.STOP:
- break
- if fid == 1:
- if ftype == TType.LIST:
- self.tables = []
- (_etype597, _size594) = iprot.readListBegin()
- for _i598 in xrange(_size594):
- _elem599 = Table()
- _elem599.read(iprot)
- self.tables.append(_elem599)
+ _elem592 = Table()
+ _elem592.read(iprot)
+ self.tables.append(_elem592)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -13565,8 +12891,8 @@ class GetTablesResult:
if self.tables is not None:
oprot.writeFieldBegin('tables', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.tables))
- for iter600 in self.tables:
- iter600.write(oprot)
+ for iter593 in self.tables:
+ iter593.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 5e18f9b..f411dfa 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -736,8 +736,6 @@ class Table
PRIVILEGES = 13
TEMPORARY = 14
REWRITEENABLED = 15
- MMNEXTWRITEID = 16
- MMWATERMARKWRITEID = 17
FIELDS = {
TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'},
@@ -754,9 +752,7 @@ class Table
TABLETYPE => {:type => ::Thrift::Types::STRING, :name => 'tableType'},
PRIVILEGES => {:type => ::Thrift::Types::STRUCT, :name => 'privileges', :class => ::PrincipalPrivilegeSet, :optional => true},
TEMPORARY => {:type => ::Thrift::Types::BOOL, :name => 'temporary', :default => false, :optional => true},
- REWRITEENABLED => {:type => ::Thrift::Types::BOOL, :name => 'rewriteEnabled', :optional => true},
- MMNEXTWRITEID => {:type => ::Thrift::Types::I64, :name => 'mmNextWriteId', :optional => true},
- MMWATERMARKWRITEID => {:type => ::Thrift::Types::I64, :name => 'mmWatermarkWriteId', :optional => true}
+ REWRITEENABLED => {:type => ::Thrift::Types::BOOL, :name => 'rewriteEnabled', :optional => true}
}
def struct_fields; FIELDS; end
@@ -2809,166 +2805,6 @@ class CacheFileMetadataRequest
::Thrift::Struct.generate_accessors self
end
-class GetNextWriteIdRequest
- include ::Thrift::Struct, ::Thrift::Struct_Union
- DBNAME = 1
- TBLNAME = 2
-
- FIELDS = {
- DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
- TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field dbName is unset!') unless @dbName
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tblName is unset!') unless @tblName
- end
-
- ::Thrift::Struct.generate_accessors self
-end
-
-class GetNextWriteIdResult
- include ::Thrift::Struct, ::Thrift::Struct_Union
- WRITEID = 1
-
- FIELDS = {
- WRITEID => {:type => ::Thrift::Types::I64, :name => 'writeId'}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field writeId is unset!') unless @writeId
- end
-
- ::Thrift::Struct.generate_accessors self
-end
-
-class FinalizeWriteIdRequest
- include ::Thrift::Struct, ::Thrift::Struct_Union
- DBNAME = 1
- TBLNAME = 2
- WRITEID = 3
- COMMIT = 4
-
- FIELDS = {
- DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
- TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'},
- WRITEID => {:type => ::Thrift::Types::I64, :name => 'writeId'},
- COMMIT => {:type => ::Thrift::Types::BOOL, :name => 'commit'}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field dbName is unset!') unless @dbName
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tblName is unset!') unless @tblName
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field writeId is unset!') unless @writeId
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field commit is unset!') if @commit.nil?
- end
-
- ::Thrift::Struct.generate_accessors self
-end
-
-class FinalizeWriteIdResult
- include ::Thrift::Struct, ::Thrift::Struct_Union
-
- FIELDS = {
-
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- end
-
- ::Thrift::Struct.generate_accessors self
-end
-
-class HeartbeatWriteIdRequest
- include ::Thrift::Struct, ::Thrift::Struct_Union
- DBNAME = 1
- TBLNAME = 2
- WRITEID = 3
-
- FIELDS = {
- DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
- TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'},
- WRITEID => {:type => ::Thrift::Types::I64, :name => 'writeId'}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field dbName is unset!') unless @dbName
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tblName is unset!') unless @tblName
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field writeId is unset!') unless @writeId
- end
-
- ::Thrift::Struct.generate_accessors self
-end
-
-class HeartbeatWriteIdResult
- include ::Thrift::Struct, ::Thrift::Struct_Union
-
- FIELDS = {
-
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- end
-
- ::Thrift::Struct.generate_accessors self
-end
-
-class GetValidWriteIdsRequest
- include ::Thrift::Struct, ::Thrift::Struct_Union
- DBNAME = 1
- TBLNAME = 2
-
- FIELDS = {
- DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
- TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field dbName is unset!') unless @dbName
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tblName is unset!') unless @tblName
- end
-
- ::Thrift::Struct.generate_accessors self
-end
-
-class GetValidWriteIdsResult
- include ::Thrift::Struct, ::Thrift::Struct_Union
- LOWWATERMARKID = 1
- HIGHWATERMARKID = 2
- AREIDSVALID = 3
- IDS = 4
-
- FIELDS = {
- LOWWATERMARKID => {:type => ::Thrift::Types::I64, :name => 'lowWatermarkId'},
- HIGHWATERMARKID => {:type => ::Thrift::Types::I64, :name => 'highWatermarkId'},
- AREIDSVALID => {:type => ::Thrift::Types::BOOL, :name => 'areIdsValid', :optional => true},
- IDS => {:type => ::Thrift::Types::LIST, :name => 'ids', :element => {:type => ::Thrift::Types::I64}, :optional => true}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field lowWatermarkId is unset!') unless @lowWatermarkId
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field highWatermarkId is unset!') unless @highWatermarkId
- end
-
- ::Thrift::Struct.generate_accessors self
-end
-
class GetAllFunctionsResponse
include ::Thrift::Struct, ::Thrift::Struct_Union
FUNCTIONS = 1
http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
index 36be2e8..04e63f3 100644
--- a/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
+++ b/metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
@@ -2562,66 +2562,6 @@ module ThriftHiveMetastore
raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'cache_file_metadata failed: unknown result')
end
- def get_next_write_id(req)
- send_get_next_write_id(req)
- return recv_get_next_write_id()
- end
-
- def send_get_next_write_id(req)
- send_message('get_next_write_id', Get_next_write_id_args, :req => req)
- end
-
- def recv_get_next_write_id()
- result = receive_message(Get_next_write_id_result)
- return result.success unless result.success.nil?
- raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_next_write_id failed: unknown result')
- end
-
- def finalize_write_id(req)
- send_finalize_write_id(req)
- return recv_finalize_write_id()
- end
-
- def send_finalize_write_id(req)
- send_message('finalize_write_id', Finalize_write_id_args, :req => req)
- end
-
- def recv_finalize_write_id()
- result = receive_message(Finalize_write_id_result)
- return result.success unless result.success.nil?
- raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'finalize_write_id failed: unknown result')
- end
-
- def heartbeat_write_id(req)
- send_heartbeat_write_id(req)
- return recv_heartbeat_write_id()
- end
-
- def send_heartbeat_write_id(req)
- send_message('heartbeat_write_id', Heartbeat_write_id_args, :req => req)
- end
-
- def recv_heartbeat_write_id()
- result = receive_message(Heartbeat_write_id_result)
- return result.success unless result.success.nil?
- raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'heartbeat_write_id failed: unknown result')
- end
-
- def get_valid_write_ids(req)
- send_get_valid_write_ids(req)
- return recv_get_valid_write_ids()
- end
-
- def send_get_valid_write_ids(req)
- send_message('get_valid_write_ids', Get_valid_write_ids_args, :req => req)
- end
-
- def recv_get_valid_write_ids()
- result = receive_message(Get_valid_write_ids_result)
- return result.success unless result.success.nil?
- raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_valid_write_ids failed: unknown result')
- end
-
end
class Processor < ::FacebookService::Processor
@@ -4517,34 +4457,6 @@ module ThriftHiveMetastore
write_result(result, oprot, 'cache_file_metadata', seqid)
end
- def process_get_next_write_id(seqid, iprot, oprot)
- args = read_args(iprot, Get_next_write_id_args)
- result = Get_next_write_id_result.new()
- result.success = @handler.get_next_write_id(args.req)
- write_result(result, oprot, 'get_next_write_id', seqid)
- end
-
- def process_finalize_write_id(seqid, iprot, oprot)
- args = read_args(iprot, Finalize_write_id_args)
- result = Finalize_write_id_result.new()
- result.success = @handler.finalize_write_id(args.req)
- write_result(result, oprot, 'finalize_write_id', seqid)
- end
-
- def process_heartbeat_write_id(seqid, iprot, oprot)
- args = read_args(iprot, Heartbeat_write_id_args)
- result = Heartbeat_write_id_result.new()
- result.success = @handler.heartbeat_write_id(args.req)
- write_result(result, oprot, 'heartbeat_write_id', seqid)
- end
-
- def process_get_valid_write_ids(seqid, iprot, oprot)
- args = read_args(iprot, Get_valid_write_ids_args)
- result = Get_valid_write_ids_result.new()
- result.success = @handler.get_valid_write_ids(args.req)
- write_result(result, oprot, 'get_valid_write_ids', seqid)
- end
-
end
# HELPER FUNCTIONS AND STRUCTURES
@@ -10319,133 +10231,5 @@ module ThriftHiveMetastore
::Thrift::Struct.generate_accessors self
end
- class Get_next_write_id_args
- include ::Thrift::Struct, ::Thrift::Struct_Union
- REQ = 1
-
- FIELDS = {
- REQ => {:type => ::Thrift::Types::STRUCT, :name => 'req', :class => ::GetNextWriteIdRequest}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- end
-
- ::Thrift::Struct.generate_accessors self
- end
-
- class Get_next_write_id_result
- include ::Thrift::Struct, ::Thrift::Struct_Union
- SUCCESS = 0
-
- FIELDS = {
- SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::GetNextWriteIdResult}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- end
-
- ::Thrift::Struct.generate_accessors self
- end
-
- class Finalize_write_id_args
- include ::Thrift::Struct, ::Thrift::Struct_Union
- REQ = 1
-
- FIELDS = {
- REQ => {:type => ::Thrift::Types::STRUCT, :name => 'req', :class => ::FinalizeWriteIdRequest}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- end
-
- ::Thrift::Struct.generate_accessors self
- end
-
- class Finalize_write_id_result
- include ::Thrift::Struct, ::Thrift::Struct_Union
- SUCCESS = 0
-
- FIELDS = {
- SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::FinalizeWriteIdResult}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- end
-
- ::Thrift::Struct.generate_accessors self
- end
-
- class Heartbeat_write_id_args
- include ::Thrift::Struct, ::Thrift::Struct_Union
- REQ = 1
-
- FIELDS = {
- REQ => {:type => ::Thrift::Types::STRUCT, :name => 'req', :class => ::HeartbeatWriteIdRequest}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- end
-
- ::Thrift::Struct.generate_accessors self
- end
-
- class Heartbeat_write_id_result
- include ::Thrift::Struct, ::Thrift::Struct_Union
- SUCCESS = 0
-
- FIELDS = {
- SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::HeartbeatWriteIdResult}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- end
-
- ::Thrift::Struct.generate_accessors self
- end
-
- class Get_valid_write_ids_args
- include ::Thrift::Struct, ::Thrift::Struct_Union
- REQ = 1
-
- FIELDS = {
- REQ => {:type => ::Thrift::Types::STRUCT, :name => 'req', :class => ::GetValidWriteIdsRequest}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- end
-
- ::Thrift::Struct.generate_accessors self
- end
-
- class Get_valid_write_ids_result
- include ::Thrift::Struct, ::Thrift::Struct_Union
- SUCCESS = 0
-
- FIELDS = {
- SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::GetValidWriteIdsResult}
- }
-
- def struct_fields; FIELDS; end
-
- def validate
- end
-
- ::Thrift::Struct.generate_accessors self
- end
-
end
http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index ff3505a..504946a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -41,7 +41,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.Callable;
@@ -83,7 +82,6 @@ import org.apache.hadoop.hive.common.metrics.common.MetricsVariable;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.io.HdfsUtils;
-import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.events.AddIndexEvent;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
@@ -120,7 +118,6 @@ import org.apache.hadoop.hive.metastore.events.PreReadDatabaseEvent;
import org.apache.hadoop.hive.metastore.events.PreReadTableEvent;
import org.apache.hadoop.hive.metastore.filemeta.OrcFileMetadataHandler;
import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
-import org.apache.hadoop.hive.metastore.model.MTableWrite;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -157,9 +154,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.commons.lang.StringUtils.join;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_COMMENT;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.DEFAULT_DATABASE_NAME;
-import static org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName;
+
import com.facebook.fb303.FacebookBase;
import com.facebook.fb303.fb_status;
import com.google.common.annotations.VisibleForTesting;
@@ -6915,216 +6910,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
throw newMetaException(e);
}
}
-
- private final Random random = new Random();
- @Override
- public GetNextWriteIdResult get_next_write_id(GetNextWriteIdRequest req) throws TException {
- RawStore ms = getMS();
- String dbName = HiveStringUtils.normalizeIdentifier(req.getDbName()),
- tblName = HiveStringUtils.normalizeIdentifier(req.getTblName());
- startFunction("get_next_write_id", " : db=" + dbName + " tbl=" + tblName);
- Exception exception = null;
- long writeId = -1;
- try {
- int deadlockTryCount = 10;
- int deadlockRetryBackoffMs = 200;
- while (deadlockTryCount > 0) {
- boolean ok = false;
- ms.openTransaction();
- try {
- Table tbl = ms.getTable(dbName, tblName);
- if (tbl == null) {
- throw new NoSuchObjectException(dbName + "." + tblName);
- }
- writeId = tbl.isSetMmNextWriteId() ? tbl.getMmNextWriteId() : 0;
- tbl.setMmNextWriteId(writeId + 1);
- ms.alterTable(dbName, tblName, tbl);
-
- ok = true;
- } finally {
- if (!ok) {
- ms.rollbackTransaction();
- // Exception should propagate; don't override it by breaking out of the loop.
- } else {
- Boolean commitResult = ms.commitTransactionExpectDeadlock();
- if (commitResult != null) {
- if (commitResult) break; // Assume no exception; ok to break out of the loop.
- throw new MetaException("Failed to commit");
- }
- }
- }
- LOG.warn("Getting the next write ID failed due to a deadlock; retrying");
- Thread.sleep(random.nextInt(deadlockRetryBackoffMs));
- }
-
- // Do a separate txn after we have reserved the number.
- boolean ok = false;
- ms.openTransaction();
- try {
- Table tbl = ms.getTable(dbName, tblName);
- ms.createTableWrite(tbl, writeId, MM_WRITE_OPEN, System.currentTimeMillis());
- ok = true;
- } finally {
- commitOrRollback(ms, ok);
- }
- } catch (Exception e) {
- exception = e;
- throwMetaException(e);
- } finally {
- endFunction("get_next_write_id", exception == null, exception, tblName);
- }
- return new GetNextWriteIdResult(writeId);
- }
-
- @Override
- public FinalizeWriteIdResult finalize_write_id(FinalizeWriteIdRequest req) throws TException {
- RawStore ms = getMS();
- String dbName = HiveStringUtils.normalizeIdentifier(req.getDbName()),
- tblName = HiveStringUtils.normalizeIdentifier(req.getTblName());
- long writeId = req.getWriteId();
- boolean commit = req.isCommit();
- startFunction("finalize_write_id", " : db=" + dbName + " tbl=" + tblName
- + " writeId=" + writeId + " commit=" + commit);
- Exception ex = null;
- try {
- boolean ok = false;
- ms.openTransaction();
- try {
- MTableWrite tw = getActiveTableWrite(ms, dbName, tblName, writeId);
- if (tw == null) {
- throw new MetaException("Write ID " + writeId + " for " + dbName + "." + tblName
- + " does not exist or is not active");
- }
- tw.setState(String.valueOf(commit ? MM_WRITE_COMMITTED : MM_WRITE_ABORTED));
- ms.updateTableWrite(tw);
- ok = true;
- } finally {
- commitOrRollback(ms, ok);
- }
- } catch (Exception e) {
- ex = e;
- throwMetaException(e);
- } finally {
- endFunction("finalize_write_id", ex == null, ex, tblName);
- }
- return new FinalizeWriteIdResult();
- }
-
- private void commitOrRollback(RawStore ms, boolean ok) throws MetaException {
- if (ok) {
- if (!ms.commitTransaction()) throw new MetaException("Failed to commit");
- } else {
- ms.rollbackTransaction();
- }
- }
-
- @Override
- public HeartbeatWriteIdResult heartbeat_write_id(HeartbeatWriteIdRequest req)
- throws TException {
- RawStore ms = getMS();
- String dbName = HiveStringUtils.normalizeIdentifier(req.getDbName()),
- tblName = HiveStringUtils.normalizeIdentifier(req.getTblName());
- long writeId = req.getWriteId();
- startFunction("heartbeat_write_id", " : db="
- + dbName + " tbl=" + tblName + " writeId=" + writeId);
- Exception ex = null;
- boolean wasAborted = false;
- try {
- boolean ok = false;
- ms.openTransaction();
- try {
- MTableWrite tw = getActiveTableWrite(ms, dbName, tblName, writeId);
- long absTimeout = HiveConf.getTimeVar(getConf(),
- ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT, TimeUnit.MILLISECONDS);
- if (tw.getCreated() + absTimeout < System.currentTimeMillis()) {
- tw.setState(String.valueOf(MM_WRITE_ABORTED));
- wasAborted = true;
- }
- tw.setLastHeartbeat(System.currentTimeMillis());
- ms.updateTableWrite(tw);
- ok = true;
- } finally {
- commitOrRollback(ms, ok);
- }
- } catch (Exception e) {
- ex = e;
- throwMetaException(e);
- } finally {
- endFunction("heartbeat_write_id", ex == null, ex, tblName);
- }
- if (wasAborted) throw new MetaException("The write was aborted due to absolute timeout");
- return new HeartbeatWriteIdResult();
- }
-
- private MTableWrite getActiveTableWrite(RawStore ms, String dbName,
- String tblName, long writeId) throws MetaException {
- MTableWrite tw = ms.getTableWrite(dbName, tblName, writeId);
- if (tw == null) {
- return null;
- }
- assert tw.getState().length() == 1;
- char state = tw.getState().charAt(0);
- if (state != MM_WRITE_OPEN) {
- throw new MetaException("Invalid write state: " + state);
- }
- return tw;
- }
-
- @Override
- public GetValidWriteIdsResult get_valid_write_ids(
- GetValidWriteIdsRequest req) throws TException {
- RawStore ms = getMS();
- String dbName = req.getDbName(), tblName = req.getTblName();
- startFunction("get_valid_write_ids", " : db=" + dbName + " tbl=" + tblName);
- GetValidWriteIdsResult result = new GetValidWriteIdsResult();
- Exception ex = null;
- try {
- boolean ok = false;
- ms.openTransaction();
- try {
- Table tbl = ms.getTable(dbName, tblName);
- if (tbl == null) {
- throw new InvalidObjectException(dbName + "." + tblName);
- }
- long nextId = tbl.isSetMmNextWriteId() ? tbl.getMmNextWriteId() : 0;
- long watermarkId = tbl.isSetMmWatermarkWriteId() ? tbl.getMmWatermarkWriteId() : -1;
- if (nextId > (watermarkId + 1)) {
- // There may be some intermediate failed or active writes; get the valid ones.
- List<Long> ids = ms.getTableWriteIds(
- dbName, tblName, watermarkId, nextId, MM_WRITE_COMMITTED);
- // TODO: we could optimize here and send the smaller of the lists, and also use ranges
- if (!ids.isEmpty()) {
- Iterator<Long> iter = ids.iterator();
- long oldWatermarkId = watermarkId;
- while (iter.hasNext()) {
- Long nextWriteId = iter.next();
- if (nextWriteId != watermarkId + 1) break;
- ++watermarkId;
- }
- long removed = watermarkId - oldWatermarkId;
- if (removed > 0) {
- ids = ids.subList((int)removed, ids.size());
- }
- if (!ids.isEmpty()) {
- result.setIds(ids);
- result.setAreIdsValid(true);
- }
- }
- }
- result.setHighWatermarkId(nextId);
- result.setLowWatermarkId(watermarkId);
- ok = true;
- } finally {
- commitOrRollback(ms, ok);
- }
- } catch (Exception e) {
- ex = e;
- throwMetaException(e);
- } finally {
- endFunction("get_valid_write_ids", ex == null, ex, tblName);
- }
- return result;
- }
}
@@ -7598,7 +7383,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
startCompactorInitiator(conf);
startCompactorWorkers(conf);
startCompactorCleaner(conf);
- startMmHousekeepingThread(conf);
startHouseKeeperService(conf);
} catch (Throwable e) {
LOG.error("Failure when starting the compactor, compactions may not happen, " +
@@ -7640,16 +7424,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
}
- private static void startMmHousekeepingThread(HiveConf conf) throws Exception {
- long intervalMs = HiveConf.getTimeVar(conf,
- ConfVars.HIVE_METASTORE_MM_THREAD_SCAN_INTERVAL, TimeUnit.MILLISECONDS);
- if (intervalMs > 0) {
- MetaStoreThread thread = new MmCleanerThread(intervalMs);
- initializeAndStartThread(thread, conf);
- }
- }
-
-
private static MetaStoreThread instantiateThread(String classname) throws Exception {
Class<?> c = Class.forName(classname);
Object o = c.newInstance();
http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 4912a31..0d8a76a 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -2539,27 +2539,4 @@ public class HiveMetaStoreClient implements IMetaStoreClient {
CacheFileMetadataResult result = client.cache_file_metadata(req);
return result.isIsSupported();
}
-
- @Override
- public long getNextTableWriteId(String dbName, String tableName) throws TException {
- return client.get_next_write_id(new GetNextWriteIdRequest(dbName, tableName)).getWriteId();
- }
-
- @Override
- public void finalizeTableWrite(
- String dbName, String tableName, long writeId, boolean commit) throws TException {
- client.finalize_write_id(new FinalizeWriteIdRequest(dbName, tableName, writeId, commit));
- }
-
- @Override
- public void heartbeatTableWrite(
- String dbName, String tableName, long writeId) throws TException {
- client.heartbeat_write_id(new HeartbeatWriteIdRequest(dbName, tableName, writeId));
- }
-
- @Override
- public GetValidWriteIdsResult getValidWriteIds(
- String dbName, String tableName) throws TException {
- return client.get_valid_write_ids(new GetValidWriteIdsRequest(dbName, tableName));
- }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 82db281..023a289 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -56,7 +56,6 @@ import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest;
import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse;
import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResult;
import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
@@ -1665,13 +1664,4 @@ public interface IMetaStoreClient {
void addForeignKey(List<SQLForeignKey> foreignKeyCols) throws
MetaException, NoSuchObjectException, TException;
-
- long getNextTableWriteId(String dbName, String tableName) throws TException;
-
- void heartbeatTableWrite(String dbName, String tableName, long writeId) throws TException;
-
- void finalizeTableWrite(String dbName, String tableName, long writeId,
- boolean commit) throws TException;
-
- GetValidWriteIdsResult getValidWriteIds(String dbName, String tableName) throws TException;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/77511070/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java
deleted file mode 100644
index d99b0d7..0000000
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidWriteIds;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.RawStore.FullTableName;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.model.MTableWrite;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Supplier;
-
-public class MmCleanerThread extends Thread implements MetaStoreThread {
- private final static Logger LOG = LoggerFactory.getLogger(MmCleanerThread.class);
- private HiveConf conf;
- private int threadId;
- private AtomicBoolean stop;
- private long intervalMs;
- private long heartbeatTimeoutMs, absTimeoutMs, abortedGraceMs;
- /** Time override for tests. Only used for MM timestamp logic, not for the thread timing. */
- private Supplier<Long> timeOverride = null;
-
- public MmCleanerThread(long intervalMs) {
- this.intervalMs = intervalMs;
- }
-
- @VisibleForTesting
- void overrideTime(Supplier<Long> timeOverride) {
- this.timeOverride = timeOverride;
- }
-
- private long getTimeMs() {
- return timeOverride == null ? System.currentTimeMillis() : timeOverride.get();
- }
-
- @Override
- public void setHiveConf(HiveConf conf) {
- this.conf = conf;
- heartbeatTimeoutMs = HiveConf.getTimeVar(
- conf, ConfVars.HIVE_METASTORE_MM_HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);
- absTimeoutMs = HiveConf.getTimeVar(
- conf, ConfVars.HIVE_METASTORE_MM_ABSOLUTE_TIMEOUT, TimeUnit.MILLISECONDS);
- abortedGraceMs = HiveConf.getTimeVar(
- conf, ConfVars.HIVE_METASTORE_MM_ABORTED_GRACE_PERIOD, TimeUnit.MILLISECONDS);
- if (heartbeatTimeoutMs > absTimeoutMs) {
- throw new RuntimeException("Heartbeat timeout " + heartbeatTimeoutMs
- + " cannot be larger than the absolute timeout " + absTimeoutMs);
- }
- }
-
- @Override
- public void setThreadId(int threadId) {
- this.threadId = threadId;
- }
-
- @Override
- public void init(AtomicBoolean stop, AtomicBoolean looped) throws MetaException {
- this.stop = stop;
- setPriority(MIN_PRIORITY);
- setDaemon(true);
- }
-
- @Override
- public void run() {
- // Only get RS here, when we are already on the thread.
- RawStore rs = getRs();
- while (true) {
- if (checkStop()) return;
- long endTimeNs = System.nanoTime() + intervalMs * 1000000L;
-
- runOneIteration(rs);
-
- if (checkStop()) return;
- long waitTimeMs = (endTimeNs - System.nanoTime()) / 1000000L;
- if (waitTimeMs <= 0) continue;
- try {
- Thread.sleep(waitTimeMs);
- } catch (InterruptedException e) {
- LOG.error("Thread was interrupted and will now exit");
- return;
- }
- }
- }
-
- private RawStore getRs() {
- try {
- return RawStoreProxy.getProxy(conf, conf,
- conf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL), threadId);
- } catch (MetaException e) {
- LOG.error("Failed to get RawStore; the thread will now die", e);
- throw new RuntimeException(e);
- }
- }
-
- private boolean checkStop() {
- if (!stop.get()) return false;
- LOG.info("Stopping due to an external request");
- return true;
- }
-
- @VisibleForTesting
- void runOneIteration(RawStore rs) {
- // We only get the names here; we want to get and process each table in a separate DB txn.
- List<FullTableName> mmTables = null;
- try {
- mmTables = rs.getAllMmTablesForCleanup();
- } catch (MetaException e) {
- LOG.error("Failed to get tables", e);
- return;
- }
- for (FullTableName tableName : mmTables) {
- try {
- processOneTable(tableName, rs);
- } catch (MetaException e) {
- LOG.error("Failed to process " + tableName, e);
- }
- }
- }
-
- private void processOneTable(FullTableName table, RawStore rs) throws MetaException {
- // 1. Time out writes that have been running for a while.
- // a) Heartbeat timeouts (not enabled right now as heartbeat is not implemented).
- // b) Absolute timeouts.
- // c) Gaps that have the next ID and the derived absolute timeout. This is a small special
- // case that can happen if we increment next ID but fail to insert the write ID record,
- // which we do in separate txns to avoid making the conflict-prone increment txn longer.
- LOG.info("Processing table " + table);
- Table t = rs.getTable(table.dbName, table.tblName);
- HashSet<Long> removeWriteIds = new HashSet<>(), cleanupOnlyWriteIds = new HashSet<>();
- getWritesThatReadyForCleanUp(t, table, rs, removeWriteIds, cleanupOnlyWriteIds);
-
- // 2. Delete the aborted writes' files from the FS.
- deleteAbortedWriteIdFiles(table, rs, t, removeWriteIds);
- deleteAbortedWriteIdFiles(table, rs, t, cleanupOnlyWriteIds);
- // removeWriteIds-s now only contains the writes that were fully cleaned up after.
-
- // 3. Advance the watermark.
- advanceWatermark(table, rs, removeWriteIds);
- }
-
- private void getWritesThatReadyForCleanUp(Table t, FullTableName table, RawStore rs,
- HashSet<Long> removeWriteIds, HashSet<Long> cleanupOnlyWriteIds) throws MetaException {
- // We will generally ignore errors here. First, we expect some conflicts; second, we will get
- // the final view of things after we do (or try, at any rate) all the updates.
- long watermarkId = t.isSetMmWatermarkWriteId() ? t.getMmWatermarkWriteId() : -1,
- nextWriteId = t.isSetMmNextWriteId() ? t.getMmNextWriteId() : 0;
- long now = getTimeMs(), earliestOkHeartbeatMs = now - heartbeatTimeoutMs,
- earliestOkCreateMs = now - absTimeoutMs, latestAbortedMs = now - abortedGraceMs;
-
- List<MTableWrite> writes = rs.getTableWrites(
- table.dbName, table.tblName, watermarkId, nextWriteId);
- ListIterator<MTableWrite> iter = writes.listIterator(writes.size());
- long expectedId = -1, nextCreated = -1;
- // We will go in reverse order and add aborted writes for the gaps that have a following
- // write ID that would imply that the previous one (created earlier) would have already
- // expired, had it been open and not updated.
- while (iter.hasPrevious()) {
- MTableWrite write = iter.previous();
- addTimedOutMissingWriteIds(rs, table.dbName, table.tblName, write.getWriteId(),
- nextCreated, expectedId, earliestOkHeartbeatMs, cleanupOnlyWriteIds, now);
- expectedId = write.getWriteId() - 1;
- nextCreated = write.getCreated();
- char state = write.getState().charAt(0);
- if (state == HiveMetaStore.MM_WRITE_ABORTED) {
- if (write.getLastHeartbeat() < latestAbortedMs) {
- removeWriteIds.add(write.getWriteId());
- } else {
- cleanupOnlyWriteIds.add(write.getWriteId());
- }
- } else if (state == HiveMetaStore.MM_WRITE_OPEN && write.getCreated() < earliestOkCreateMs) {
- // TODO: also check for heartbeat here.
- if (expireTimedOutWriteId(rs, table.dbName, table.tblName, write.getWriteId(),
- now, earliestOkCreateMs, earliestOkHeartbeatMs, cleanupOnlyWriteIds)) {
- cleanupOnlyWriteIds.add(write.getWriteId());
- }
- }
- }
- addTimedOutMissingWriteIds(rs, table.dbName, table.tblName, watermarkId,
- nextCreated, expectedId, earliestOkHeartbeatMs, cleanupOnlyWriteIds, now);
- }
-
- private void advanceWatermark(
- FullTableName table, RawStore rs, HashSet<Long> cleanedUpWriteIds) {
- if (!rs.openTransaction()) {
- LOG.error("Cannot open transaction");
- return;
- }
- boolean success = false;
- try {
- Table t = rs.getTable(table.dbName, table.tblName);
- if (t == null) {
- return;
- }
- long watermarkId = t.getMmWatermarkWriteId();
- List<Long> writeIds = rs.getTableWriteIds(table.dbName, table.tblName, watermarkId,
- t.getMmNextWriteId(), HiveMetaStore.MM_WRITE_COMMITTED);
- long expectedId = watermarkId + 1;
- boolean hasGap = false;
- Iterator<Long> idIter = writeIds.iterator();
- while (idIter.hasNext()) {
- long next = idIter.next();
- if (next < expectedId) continue;
- while (next > expectedId) {
- if (!cleanedUpWriteIds.contains(expectedId)) {
- hasGap = true;
- break;
- }
- ++expectedId;
- }
- if (hasGap) break;
- ++expectedId;
- }
- // Make sure we also advance over the trailing aborted ones.
- if (!hasGap) {
- while (cleanedUpWriteIds.contains(expectedId)) {
- ++expectedId;
- }
- }
- long newWatermarkId = expectedId - 1;
- if (newWatermarkId > watermarkId) {
- t.setMmWatermarkWriteId(newWatermarkId);
- rs.alterTable(table.dbName, table.tblName, t);
- rs.deleteTableWrites(table.dbName, table.tblName, -1, expectedId);
- }
- success = true;
- } catch (Exception ex) {
- // TODO: should we try a couple times on conflicts? Aborted writes cannot be unaborted.
- LOG.error("Failed to advance watermark", ex);
- rs.rollbackTransaction();
- }
- if (success) {
- tryCommit(rs);
- }
- }
-
- private void deleteAbortedWriteIdFiles(
- FullTableName table, RawStore rs, Table t, HashSet<Long> cleanUpWriteIds) {
- if (cleanUpWriteIds.isEmpty()) return;
- if (t.getPartitionKeysSize() > 0) {
- for (String location : rs.getAllPartitionLocations(table.dbName, table.tblName)) {
- deleteAbortedWriteIdFiles(location, cleanUpWriteIds);
- }
- } else {
- deleteAbortedWriteIdFiles(t.getSd().getLocation(), cleanUpWriteIds);
- }
- }
-
- private void deleteAbortedWriteIdFiles(String location, HashSet<Long> abortedWriteIds) {
- LOG.info("Looking for " + abortedWriteIds.size() + " aborted write output in " + location);
- Path path = new Path(location);
- FileSystem fs;
- FileStatus[] files;
- try {
- fs = path.getFileSystem(conf);
- if (!fs.exists(path)) {
- LOG.warn(path + " does not exist; assuming that the cleanup is not needed.");
- return;
- }
- // TODO# this doesn't account for list bucketing. Do nothing now, ACID will solve all problems.
- files = fs.listStatus(path);
- } catch (Exception ex) {
- LOG.error("Failed to get files for " + path + "; cannot ensure cleanup for any writes");
- abortedWriteIds.clear();
- return;
- }
- for (FileStatus file : files) {
- Path childPath = file.getPath();
- if (!file.isDirectory()) {
- LOG.warn("Skipping a non-directory file " + childPath);
- continue;
- }
- Long writeId = ValidWriteIds.extractWriteId(childPath);
- if (writeId == null) {
- LOG.warn("Skipping an unknown directory " + childPath);
- continue;
- }
- if (!abortedWriteIds.contains(writeId.longValue())) continue;
- try {
- if (!fs.delete(childPath, true)) throw new IOException("delete returned false");
- } catch (Exception ex) {
- LOG.error("Couldn't delete " + childPath + "; not cleaning up " + writeId, ex);
- abortedWriteIds.remove(writeId.longValue());
- }
- }
- }
-
- private boolean expireTimedOutWriteId(RawStore rs, String dbName,
- String tblName, long writeId, long now, long earliestOkCreatedMs,
- long earliestOkHeartbeatMs, HashSet<Long> cleanupOnlyWriteIds) {
- if (!rs.openTransaction()) {
- return false;
- }
- try {
- MTableWrite tw = rs.getTableWrite(dbName, tblName, writeId);
- if (tw == null) {
- // The write have been updated since the time when we thought it has expired.
- tryCommit(rs);
- return true;
- }
- char state = tw.getState().charAt(0);
- if (state != HiveMetaStore.MM_WRITE_OPEN
- || (tw.getCreated() > earliestOkCreatedMs
- && tw.getLastHeartbeat() > earliestOkHeartbeatMs)) {
- tryCommit(rs);
- return true; // The write has been updated since the time when we thought it has expired.
- }
- tw.setState(String.valueOf(HiveMetaStore.MM_WRITE_ABORTED));
- tw.setLastHeartbeat(now);
- rs.updateTableWrite(tw);
- } catch (Exception ex) {
- LOG.error("Failed to update an expired table write", ex);
- rs.rollbackTransaction();
- return false;
- }
- boolean result = tryCommit(rs);
- if (result) {
- cleanupOnlyWriteIds.add(writeId);
- }
- return result;
- }
-
- private boolean tryCommit(RawStore rs) {
- try {
- return rs.commitTransaction();
- } catch (Exception ex) {
- LOG.error("Failed to commit transaction", ex);
- return false;
- }
- }
-
- private boolean addTimedOutMissingWriteIds(RawStore rs, String dbName, String tblName,
- long foundPrevId, long nextCreated, long expectedId, long earliestOkHeartbeatMs,
- HashSet<Long> cleanupOnlyWriteIds, long now) throws MetaException {
- // Assume all missing ones are created at the same time as the next present write ID.
- // We also assume missing writes never had any heartbeats.
- if (nextCreated >= earliestOkHeartbeatMs || expectedId < 0) return true;
- Table t = null;
- List<Long> localCleanupOnlyWriteIds = new ArrayList<>();
- while (foundPrevId < expectedId) {
- if (t == null && !rs.openTransaction()) {
- LOG.error("Cannot open transaction; skipping");
- return false;
- }
- try {
- if (t == null) {
- t = rs.getTable(dbName, tblName);
- }
- // We don't need to double check if the write exists; the unique index will cause an error.
- rs.createTableWrite(t, expectedId, HiveMetaStore.MM_WRITE_ABORTED, now);
- } catch (Exception ex) {
- // TODO: don't log conflict exceptions?.. although we barely ever expect them.
- LOG.error("Failed to create a missing table write", ex);
- rs.rollbackTransaction();
- return false;
- }
- localCleanupOnlyWriteIds.add(expectedId);
- --expectedId;
- }
- boolean result = (t == null || tryCommit(rs));
- if (result) {
- cleanupOnlyWriteIds.addAll(localCleanupOnlyWriteIds);
- }
- return result;
- }
-}