You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2018/07/14 06:07:17 UTC
[02/12] hive git commit: HIVE-20006: Make materializations
invalidation cache work with multiple active remote metastores (Jesus Camacho
Rodriguez, reviewed by Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
index 031e72b..1285c08 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py
@@ -14681,6 +14681,7 @@ class CreationMetadata:
- tblName
- tablesUsed
- validTxnList
+ - materializationTime
"""
thrift_spec = (
@@ -14690,14 +14691,16 @@ class CreationMetadata:
(3, TType.STRING, 'tblName', None, None, ), # 3
(4, TType.SET, 'tablesUsed', (TType.STRING,None), None, ), # 4
(5, TType.STRING, 'validTxnList', None, None, ), # 5
+ (6, TType.I64, 'materializationTime', None, None, ), # 6
)
- def __init__(self, catName=None, dbName=None, tblName=None, tablesUsed=None, validTxnList=None,):
+ def __init__(self, catName=None, dbName=None, tblName=None, tablesUsed=None, validTxnList=None, materializationTime=None,):
self.catName = catName
self.dbName = dbName
self.tblName = tblName
self.tablesUsed = tablesUsed
self.validTxnList = validTxnList
+ self.materializationTime = materializationTime
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -14738,6 +14741,11 @@ class CreationMetadata:
self.validTxnList = iprot.readString()
else:
iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.I64:
+ self.materializationTime = iprot.readI64()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -14771,6 +14779,10 @@ class CreationMetadata:
oprot.writeFieldBegin('validTxnList', TType.STRING, 5)
oprot.writeString(self.validTxnList)
oprot.writeFieldEnd()
+ if self.materializationTime is not None:
+ oprot.writeFieldBegin('materializationTime', TType.I64, 6)
+ oprot.writeI64(self.materializationTime)
+ oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
@@ -14793,6 +14805,7 @@ class CreationMetadata:
value = (value * 31) ^ hash(self.tblName)
value = (value * 31) ^ hash(self.tablesUsed)
value = (value * 31) ^ hash(self.validTxnList)
+ value = (value * 31) ^ hash(self.materializationTime)
return value
def __repr__(self):
@@ -17613,24 +17626,15 @@ class TableMeta:
class Materialization:
"""
Attributes:
- - tablesUsed
- - validTxnList
- - invalidationTime
- sourceTablesUpdateDeleteModified
"""
thrift_spec = (
None, # 0
- (1, TType.SET, 'tablesUsed', (TType.STRING,None), None, ), # 1
- (2, TType.STRING, 'validTxnList', None, None, ), # 2
- (3, TType.I64, 'invalidationTime', None, None, ), # 3
- (4, TType.BOOL, 'sourceTablesUpdateDeleteModified', None, None, ), # 4
+ (1, TType.BOOL, 'sourceTablesUpdateDeleteModified', None, None, ), # 1
)
- def __init__(self, tablesUsed=None, validTxnList=None, invalidationTime=None, sourceTablesUpdateDeleteModified=None,):
- self.tablesUsed = tablesUsed
- self.validTxnList = validTxnList
- self.invalidationTime = invalidationTime
+ def __init__(self, sourceTablesUpdateDeleteModified=None,):
self.sourceTablesUpdateDeleteModified = sourceTablesUpdateDeleteModified
def read(self, iprot):
@@ -17643,26 +17647,6 @@ class Materialization:
if ftype == TType.STOP:
break
if fid == 1:
- if ftype == TType.SET:
- self.tablesUsed = set()
- (_etype763, _size760) = iprot.readSetBegin()
- for _i764 in xrange(_size760):
- _elem765 = iprot.readString()
- self.tablesUsed.add(_elem765)
- iprot.readSetEnd()
- else:
- iprot.skip(ftype)
- elif fid == 2:
- if ftype == TType.STRING:
- self.validTxnList = iprot.readString()
- else:
- iprot.skip(ftype)
- elif fid == 3:
- if ftype == TType.I64:
- self.invalidationTime = iprot.readI64()
- else:
- iprot.skip(ftype)
- elif fid == 4:
if ftype == TType.BOOL:
self.sourceTablesUpdateDeleteModified = iprot.readBool()
else:
@@ -17677,39 +17661,21 @@ class Materialization:
oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
return
oprot.writeStructBegin('Materialization')
- if self.tablesUsed is not None:
- oprot.writeFieldBegin('tablesUsed', TType.SET, 1)
- oprot.writeSetBegin(TType.STRING, len(self.tablesUsed))
- for iter766 in self.tablesUsed:
- oprot.writeString(iter766)
- oprot.writeSetEnd()
- oprot.writeFieldEnd()
- if self.validTxnList is not None:
- oprot.writeFieldBegin('validTxnList', TType.STRING, 2)
- oprot.writeString(self.validTxnList)
- oprot.writeFieldEnd()
- if self.invalidationTime is not None:
- oprot.writeFieldBegin('invalidationTime', TType.I64, 3)
- oprot.writeI64(self.invalidationTime)
- oprot.writeFieldEnd()
if self.sourceTablesUpdateDeleteModified is not None:
- oprot.writeFieldBegin('sourceTablesUpdateDeleteModified', TType.BOOL, 4)
+ oprot.writeFieldBegin('sourceTablesUpdateDeleteModified', TType.BOOL, 1)
oprot.writeBool(self.sourceTablesUpdateDeleteModified)
oprot.writeFieldEnd()
oprot.writeFieldStop()
oprot.writeStructEnd()
def validate(self):
- if self.tablesUsed is None:
- raise TProtocol.TProtocolException(message='Required field tablesUsed is unset!')
+ if self.sourceTablesUpdateDeleteModified is None:
+ raise TProtocol.TProtocolException(message='Required field sourceTablesUpdateDeleteModified is unset!')
return
def __hash__(self):
value = 17
- value = (value * 31) ^ hash(self.tablesUsed)
- value = (value * 31) ^ hash(self.validTxnList)
- value = (value * 31) ^ hash(self.invalidationTime)
value = (value * 31) ^ hash(self.sourceTablesUpdateDeleteModified)
return value
@@ -18586,44 +18552,44 @@ class WMFullResourcePlan:
elif fid == 2:
if ftype == TType.LIST:
self.pools = []
- (_etype770, _size767) = iprot.readListBegin()
- for _i771 in xrange(_size767):
- _elem772 = WMPool()
- _elem772.read(iprot)
- self.pools.append(_elem772)
+ (_etype763, _size760) = iprot.readListBegin()
+ for _i764 in xrange(_size760):
+ _elem765 = WMPool()
+ _elem765.read(iprot)
+ self.pools.append(_elem765)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.LIST:
self.mappings = []
- (_etype776, _size773) = iprot.readListBegin()
- for _i777 in xrange(_size773):
- _elem778 = WMMapping()
- _elem778.read(iprot)
- self.mappings.append(_elem778)
+ (_etype769, _size766) = iprot.readListBegin()
+ for _i770 in xrange(_size766):
+ _elem771 = WMMapping()
+ _elem771.read(iprot)
+ self.mappings.append(_elem771)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.LIST:
self.triggers = []
- (_etype782, _size779) = iprot.readListBegin()
- for _i783 in xrange(_size779):
- _elem784 = WMTrigger()
- _elem784.read(iprot)
- self.triggers.append(_elem784)
+ (_etype775, _size772) = iprot.readListBegin()
+ for _i776 in xrange(_size772):
+ _elem777 = WMTrigger()
+ _elem777.read(iprot)
+ self.triggers.append(_elem777)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 5:
if ftype == TType.LIST:
self.poolTriggers = []
- (_etype788, _size785) = iprot.readListBegin()
- for _i789 in xrange(_size785):
- _elem790 = WMPoolTrigger()
- _elem790.read(iprot)
- self.poolTriggers.append(_elem790)
+ (_etype781, _size778) = iprot.readListBegin()
+ for _i782 in xrange(_size778):
+ _elem783 = WMPoolTrigger()
+ _elem783.read(iprot)
+ self.poolTriggers.append(_elem783)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -18644,29 +18610,29 @@ class WMFullResourcePlan:
if self.pools is not None:
oprot.writeFieldBegin('pools', TType.LIST, 2)
oprot.writeListBegin(TType.STRUCT, len(self.pools))
- for iter791 in self.pools:
- iter791.write(oprot)
+ for iter784 in self.pools:
+ iter784.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.mappings is not None:
oprot.writeFieldBegin('mappings', TType.LIST, 3)
oprot.writeListBegin(TType.STRUCT, len(self.mappings))
- for iter792 in self.mappings:
- iter792.write(oprot)
+ for iter785 in self.mappings:
+ iter785.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.triggers is not None:
oprot.writeFieldBegin('triggers', TType.LIST, 4)
oprot.writeListBegin(TType.STRUCT, len(self.triggers))
- for iter793 in self.triggers:
- iter793.write(oprot)
+ for iter786 in self.triggers:
+ iter786.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.poolTriggers is not None:
oprot.writeFieldBegin('poolTriggers', TType.LIST, 5)
oprot.writeListBegin(TType.STRUCT, len(self.poolTriggers))
- for iter794 in self.poolTriggers:
- iter794.write(oprot)
+ for iter787 in self.poolTriggers:
+ iter787.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -19140,11 +19106,11 @@ class WMGetAllResourcePlanResponse:
if fid == 1:
if ftype == TType.LIST:
self.resourcePlans = []
- (_etype798, _size795) = iprot.readListBegin()
- for _i799 in xrange(_size795):
- _elem800 = WMResourcePlan()
- _elem800.read(iprot)
- self.resourcePlans.append(_elem800)
+ (_etype791, _size788) = iprot.readListBegin()
+ for _i792 in xrange(_size788):
+ _elem793 = WMResourcePlan()
+ _elem793.read(iprot)
+ self.resourcePlans.append(_elem793)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -19161,8 +19127,8 @@ class WMGetAllResourcePlanResponse:
if self.resourcePlans is not None:
oprot.writeFieldBegin('resourcePlans', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.resourcePlans))
- for iter801 in self.resourcePlans:
- iter801.write(oprot)
+ for iter794 in self.resourcePlans:
+ iter794.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -19466,20 +19432,20 @@ class WMValidateResourcePlanResponse:
if fid == 1:
if ftype == TType.LIST:
self.errors = []
- (_etype805, _size802) = iprot.readListBegin()
- for _i806 in xrange(_size802):
- _elem807 = iprot.readString()
- self.errors.append(_elem807)
+ (_etype798, _size795) = iprot.readListBegin()
+ for _i799 in xrange(_size795):
+ _elem800 = iprot.readString()
+ self.errors.append(_elem800)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.LIST:
self.warnings = []
- (_etype811, _size808) = iprot.readListBegin()
- for _i812 in xrange(_size808):
- _elem813 = iprot.readString()
- self.warnings.append(_elem813)
+ (_etype804, _size801) = iprot.readListBegin()
+ for _i805 in xrange(_size801):
+ _elem806 = iprot.readString()
+ self.warnings.append(_elem806)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -19496,15 +19462,15 @@ class WMValidateResourcePlanResponse:
if self.errors is not None:
oprot.writeFieldBegin('errors', TType.LIST, 1)
oprot.writeListBegin(TType.STRING, len(self.errors))
- for iter814 in self.errors:
- oprot.writeString(iter814)
+ for iter807 in self.errors:
+ oprot.writeString(iter807)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.warnings is not None:
oprot.writeFieldBegin('warnings', TType.LIST, 2)
oprot.writeListBegin(TType.STRING, len(self.warnings))
- for iter815 in self.warnings:
- oprot.writeString(iter815)
+ for iter808 in self.warnings:
+ oprot.writeString(iter808)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -20081,11 +20047,11 @@ class WMGetTriggersForResourePlanResponse:
if fid == 1:
if ftype == TType.LIST:
self.triggers = []
- (_etype819, _size816) = iprot.readListBegin()
- for _i820 in xrange(_size816):
- _elem821 = WMTrigger()
- _elem821.read(iprot)
- self.triggers.append(_elem821)
+ (_etype812, _size809) = iprot.readListBegin()
+ for _i813 in xrange(_size809):
+ _elem814 = WMTrigger()
+ _elem814.read(iprot)
+ self.triggers.append(_elem814)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -20102,8 +20068,8 @@ class WMGetTriggersForResourePlanResponse:
if self.triggers is not None:
oprot.writeFieldBegin('triggers', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.triggers))
- for iter822 in self.triggers:
- iter822.write(oprot)
+ for iter815 in self.triggers:
+ iter815.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -21287,11 +21253,11 @@ class SchemaVersion:
elif fid == 4:
if ftype == TType.LIST:
self.cols = []
- (_etype826, _size823) = iprot.readListBegin()
- for _i827 in xrange(_size823):
- _elem828 = FieldSchema()
- _elem828.read(iprot)
- self.cols.append(_elem828)
+ (_etype819, _size816) = iprot.readListBegin()
+ for _i820 in xrange(_size816):
+ _elem821 = FieldSchema()
+ _elem821.read(iprot)
+ self.cols.append(_elem821)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -21351,8 +21317,8 @@ class SchemaVersion:
if self.cols is not None:
oprot.writeFieldBegin('cols', TType.LIST, 4)
oprot.writeListBegin(TType.STRUCT, len(self.cols))
- for iter829 in self.cols:
- iter829.write(oprot)
+ for iter822 in self.cols:
+ iter822.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.state is not None:
@@ -21607,11 +21573,11 @@ class FindSchemasByColsResp:
if fid == 1:
if ftype == TType.LIST:
self.schemaVersions = []
- (_etype833, _size830) = iprot.readListBegin()
- for _i834 in xrange(_size830):
- _elem835 = SchemaVersionDescriptor()
- _elem835.read(iprot)
- self.schemaVersions.append(_elem835)
+ (_etype826, _size823) = iprot.readListBegin()
+ for _i827 in xrange(_size823):
+ _elem828 = SchemaVersionDescriptor()
+ _elem828.read(iprot)
+ self.schemaVersions.append(_elem828)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -21628,8 +21594,8 @@ class FindSchemasByColsResp:
if self.schemaVersions is not None:
oprot.writeFieldBegin('schemaVersions', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.schemaVersions))
- for iter836 in self.schemaVersions:
- iter836.write(oprot)
+ for iter829 in self.schemaVersions:
+ iter829.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 0348ff2..a0fabfe 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -3277,13 +3277,15 @@ class CreationMetadata
TBLNAME = 3
TABLESUSED = 4
VALIDTXNLIST = 5
+ MATERIALIZATIONTIME = 6
FIELDS = {
CATNAME => {:type => ::Thrift::Types::STRING, :name => 'catName'},
DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'},
TBLNAME => {:type => ::Thrift::Types::STRING, :name => 'tblName'},
TABLESUSED => {:type => ::Thrift::Types::SET, :name => 'tablesUsed', :element => {:type => ::Thrift::Types::STRING}},
- VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true}
+ VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true},
+ MATERIALIZATIONTIME => {:type => ::Thrift::Types::I64, :name => 'materializationTime', :optional => true}
}
def struct_fields; FIELDS; end
@@ -3952,22 +3954,16 @@ end
class Materialization
include ::Thrift::Struct, ::Thrift::Struct_Union
- TABLESUSED = 1
- VALIDTXNLIST = 2
- INVALIDATIONTIME = 3
- SOURCETABLESUPDATEDELETEMODIFIED = 4
+ SOURCETABLESUPDATEDELETEMODIFIED = 1
FIELDS = {
- TABLESUSED => {:type => ::Thrift::Types::SET, :name => 'tablesUsed', :element => {:type => ::Thrift::Types::STRING}},
- VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList', :optional => true},
- INVALIDATIONTIME => {:type => ::Thrift::Types::I64, :name => 'invalidationTime', :optional => true},
- SOURCETABLESUPDATEDELETEMODIFIED => {:type => ::Thrift::Types::BOOL, :name => 'sourceTablesUpdateDeleteModified', :optional => true}
+ SOURCETABLESUPDATEDELETEMODIFIED => {:type => ::Thrift::Types::BOOL, :name => 'sourceTablesUpdateDeleteModified'}
}
def struct_fields; FIELDS; end
def validate
- raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tablesUsed is unset!') unless @tablesUsed
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field sourceTablesUpdateDeleteModified is unset!') if @sourceTablesUpdateDeleteModified.nil?
end
::Thrift::Struct.generate_accessors self
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
index 2bd958e..5ecfbed 100644
--- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
+++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
@@ -726,13 +726,13 @@ module ThriftHiveMetastore
raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_table_objects_by_name_req failed: unknown result')
end
- def get_materialization_invalidation_info(dbname, tbl_names)
- send_get_materialization_invalidation_info(dbname, tbl_names)
+ def get_materialization_invalidation_info(creation_metadata, validTxnList)
+ send_get_materialization_invalidation_info(creation_metadata, validTxnList)
return recv_get_materialization_invalidation_info()
end
- def send_get_materialization_invalidation_info(dbname, tbl_names)
- send_message('get_materialization_invalidation_info', Get_materialization_invalidation_info_args, :dbname => dbname, :tbl_names => tbl_names)
+ def send_get_materialization_invalidation_info(creation_metadata, validTxnList)
+ send_message('get_materialization_invalidation_info', Get_materialization_invalidation_info_args, :creation_metadata => creation_metadata, :validTxnList => validTxnList)
end
def recv_get_materialization_invalidation_info()
@@ -4043,7 +4043,7 @@ module ThriftHiveMetastore
args = read_args(iprot, Get_materialization_invalidation_info_args)
result = Get_materialization_invalidation_info_result.new()
begin
- result.success = @handler.get_materialization_invalidation_info(args.dbname, args.tbl_names)
+ result.success = @handler.get_materialization_invalidation_info(args.creation_metadata, args.validTxnList)
rescue ::MetaException => o1
result.o1 = o1
rescue ::InvalidOperationException => o2
@@ -7654,12 +7654,12 @@ module ThriftHiveMetastore
class Get_materialization_invalidation_info_args
include ::Thrift::Struct, ::Thrift::Struct_Union
- DBNAME = 1
- TBL_NAMES = 2
+ CREATION_METADATA = 1
+ VALIDTXNLIST = 2
FIELDS = {
- DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'},
- TBL_NAMES => {:type => ::Thrift::Types::LIST, :name => 'tbl_names', :element => {:type => ::Thrift::Types::STRING}}
+ CREATION_METADATA => {:type => ::Thrift::Types::STRUCT, :name => 'creation_metadata', :class => ::CreationMetadata},
+ VALIDTXNLIST => {:type => ::Thrift::Types::STRING, :name => 'validTxnList'}
}
def struct_fields; FIELDS; end
@@ -7678,7 +7678,7 @@ module ThriftHiveMetastore
O3 = 3
FIELDS = {
- SUCCESS => {:type => ::Thrift::Types::MAP, :name => 'success', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRUCT, :class => ::Materialization}},
+ SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::Materialization},
O1 => {:type => ::Thrift::Types::STRUCT, :name => 'o1', :class => ::MetaException},
O2 => {:type => ::Thrift::Types::STRUCT, :name => 'o2', :class => ::InvalidOperationException},
O3 => {:type => ::Thrift::Types::STRUCT, :name => 'o3', :class => ::UnknownDBException}
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 8d88749..e6f7333 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -3009,8 +3009,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
@Override
- public Map<String, Materialization> get_materialization_invalidation_info(final String dbName, final List<String> tableNames) {
- return MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(dbName, tableNames);
+ public Materialization get_materialization_invalidation_info(final CreationMetadata cm, final String validTxnList) throws MetaException {
+ return getTxnHandler().getMaterializationInvalidationInfo(cm, validTxnList);
}
@Override
@@ -8670,13 +8670,13 @@ public class HiveMetaStore extends ThriftHiveMetastore {
@Override
public LockResponse get_lock_materialization_rebuild(String dbName, String tableName, long txnId)
throws TException {
- return MaterializationsRebuildLockHandler.get().lockResource(dbName, tableName, txnId);
+ return getTxnHandler().lockMaterializationRebuild(dbName, tableName, txnId);
}
@Override
public boolean heartbeat_lock_materialization_rebuild(String dbName, String tableName, long txnId)
throws TException {
- return MaterializationsRebuildLockHandler.get().refreshLockResource(dbName, tableName, txnId);
+ return getTxnHandler().heartbeatLockMaterializationRebuild(dbName, tableName, txnId);
}
@Override
@@ -8992,8 +8992,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
false);
IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
- // Initialize materializations invalidation cache
- MaterializationsInvalidationCache.get().init(conf, handler);
TServerSocket serverSocket;
if (useSasl) {
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index bfd7141..acdb73b 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -167,8 +167,6 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
// instantiate the metastore server handler directly instead of connecting
// through the network
client = HiveMetaStore.newRetryingHMSHandler("hive client", this.conf, true);
- // Initialize materializations invalidation cache (only for local metastore)
- MaterializationsInvalidationCache.get().init(conf, (IHMSHandler) client);
isConnected = true;
snapshotActiveConf();
return;
@@ -1610,10 +1608,9 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
}
@Override
- public Map<String, Materialization> getMaterializationsInvalidationInfo(String dbName, List<String> viewNames)
+ public Materialization getMaterializationInvalidationInfo(CreationMetadata cm, String validTxnList)
throws MetaException, InvalidOperationException, UnknownDBException, TException {
- return client.get_materialization_invalidation_info(
- dbName, filterHook.filterTableNames(getDefaultCatalog(conf), dbName, viewNames));
+ return client.get_materialization_invalidation_info(cm, validTxnList);
}
@Override
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index b5d147b..9661beb 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -775,7 +775,7 @@ public interface IMetaStoreClient {
/**
* Returns the invalidation information for the materialized views given as input.
*/
- Map<String, Materialization> getMaterializationsInvalidationInfo(String dbName, List<String> viewNames)
+ Materialization getMaterializationInvalidationInfo(CreationMetadata cm, String validTxnList)
throws MetaException, InvalidOperationException, UnknownDBException, TException;
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java
deleted file mode 100644
index cc168a9..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsCacheCleanerTask.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Task responsible for cleaning the transactions that are not useful from the
- * materializations cache.
- */
-public class MaterializationsCacheCleanerTask implements MetastoreTaskThread {
- private static final Logger LOG = LoggerFactory.getLogger(MaterializationsCacheCleanerTask.class);
-
- private Configuration conf;
-
- @Override
- public long runFrequency(TimeUnit unit) {
- return MetastoreConf.getTimeVar(conf,
- MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_CLEAN_FREQUENCY, unit);
- }
-
- @Override
- public void setConf(Configuration configuration) {
- conf = configuration;
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public void run() {
- long removedCnt = MaterializationsInvalidationCache.get().cleanup(System.currentTimeMillis() -
- MetastoreConf.getTimeVar(conf,
- MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_EXPIRY_DURATION, TimeUnit.MILLISECONDS));
- if (removedCnt > 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Number of transaction entries deleted from materializations cache: " + removedCnt);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
deleted file mode 100644
index fc644f0..0000000
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
+++ /dev/null
@@ -1,543 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-import org.apache.hadoop.conf.Configuration;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
-import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
-import org.apache.hadoop.hive.metastore.api.LockResponse;
-import org.apache.hadoop.hive.metastore.api.Materialization;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-/**
- * This cache keeps information in memory about the table modifications so materialized views
- * can verify their invalidation time, i.e., the moment after materialization on which the
- * first transaction to the tables they used happened. This information is kept in memory
- * to check the invalidation quickly. However, we store enough information in the metastore
- * to bring this cache up if the metastore is restarted or would crashed. This cache lives
- * in the metastore server.
- */
-public final class MaterializationsInvalidationCache {
-
- private static final Logger LOG = LoggerFactory.getLogger(MaterializationsInvalidationCache.class);
-
- /* Singleton */
- private static final MaterializationsInvalidationCache SINGLETON = new MaterializationsInvalidationCache();
-
- /* If this boolean is true, this class has no functionality. Only for debugging purposes. */
- private boolean disable;
-
- /* Key is the database name. Each value is a map from the unique view qualified name to
- * the materialization invalidation info. This invalidation object contains information
- * such as the tables used by the materialized view, whether there was any update or
- * delete in the source tables since the materialized view was created or rebuilt,
- * or the invalidation time, i.e., first modification of the tables used by materialized
- * view after the view was created. */
- private final ConcurrentMap<String, ConcurrentMap<String, Materialization>> materializations =
- new ConcurrentHashMap<>();
-
- /*
- * Key is a qualified table name. The value is a (sorted) tree map (supporting concurrent
- * modifications) that will keep the modifications for a given table in the order of their
- * transaction id. This is useful to quickly check the invalidation time for a given
- * materialization.
- */
- private final ConcurrentMap<String, ConcurrentSkipListMap<Long, Long>> tableModifications =
- new ConcurrentHashMap<>();
-
- private final ConcurrentMap<String, ConcurrentSkipListSet<Long>> updateDeleteTableModifications =
- new ConcurrentHashMap<>();
-
- /* Whether the cache has been initialized or not. */
- private boolean initialized;
- /* Configuration for cache. */
- private Configuration conf;
- /* Handler to connect to metastore. */
- private IHMSHandler handler;
-
- private MaterializationsInvalidationCache() {
- }
-
- /**
- * Get instance of MaterializationsInvalidationCache.
- *
- * @return the singleton
- */
- public static MaterializationsInvalidationCache get() {
- return SINGLETON;
- }
-
- /**
- * Initialize the invalidation cache.
- *
- * The method is synchronized because we want to avoid initializing the invalidation cache
- * multiple times in embedded mode. This will not happen when we run the metastore remotely
- * as the method is called only once.
- */
- public synchronized void init(Configuration conf, IHMSHandler handler) {
- this.conf = conf;
- this.handler = handler;
-
- // This will only be true for debugging purposes
- this.disable = MetastoreConf.getVar(conf,
- MetastoreConf.ConfVars.MATERIALIZATIONS_INVALIDATION_CACHE_IMPL).equals("DISABLE");
- if (disable) {
- // Nothing to do
- return;
- }
-
- if (!initialized) {
- this.initialized = true;
- ExecutorService pool = Executors.newCachedThreadPool();
- pool.submit(new Loader());
- pool.shutdown();
- }
- }
-
- private class Loader implements Runnable {
- @Override
- public void run() {
- try {
- RawStore store = handler.getMS();
- for (String catName : store.getCatalogs()) {
- for (String dbName : store.getAllDatabases(catName)) {
- for (Table mv : store.getTableObjectsByName(catName, dbName,
- store.getTables(catName, dbName, null, TableType.MATERIALIZED_VIEW))) {
- addMaterializedView(mv.getDbName(), mv.getTableName(), ImmutableSet.copyOf(mv.getCreationMetadata().getTablesUsed()),
- mv.getCreationMetadata().getValidTxnList(), OpType.LOAD);
- }
- }
- }
- LOG.info("Initialized materializations invalidation cache");
- } catch (Exception e) {
- LOG.error("Problem connecting to the metastore when initializing the view registry");
- }
- }
- }
-
- /**
- * Adds a newly created materialized view to the cache.
- *
- * @param dbName
- * @param tableName
- * @param tablesUsed tables used by the materialized view
- * @param validTxnList
- */
- public void createMaterializedView(String dbName, String tableName, Set<String> tablesUsed,
- String validTxnList) {
- addMaterializedView(dbName, tableName, tablesUsed, validTxnList, OpType.CREATE);
- }
-
- /**
- * Method to call when materialized view is modified.
- *
- * @param dbName
- * @param tableName
- * @param tablesUsed tables used by the materialized view
- * @param validTxnList
- */
- public void alterMaterializedView(String dbName, String tableName, Set<String> tablesUsed,
- String validTxnList) {
- addMaterializedView(dbName, tableName, tablesUsed, validTxnList, OpType.ALTER);
- }
-
- /**
- * Adds the materialized view to the cache.
- *
- * @param dbName
- * @param tableName
- * @param tablesUsed tables used by the materialized view
- * @param validTxnList
- * @param opType
- */
- private void addMaterializedView(String dbName, String tableName, Set<String> tablesUsed,
- String validTxnList, OpType opType) {
- if (disable) {
- // Nothing to do
- return;
- }
- // We are going to create the map for each view in the given database
- ConcurrentMap<String, Materialization> cq =
- new ConcurrentHashMap<String, Materialization>();
- final ConcurrentMap<String, Materialization> prevCq = materializations.putIfAbsent(
- dbName, cq);
- if (prevCq != null) {
- cq = prevCq;
- }
- // Start the process to add materialization to the cache
- // Before loading the materialization in the cache, we need to update some
- // important information in the registry to account for rewriting invalidation
- if (validTxnList == null) {
- // This can happen when the materialized view was created on non-transactional tables
- return;
- }
- if (opType == OpType.CREATE || opType == OpType.ALTER) {
- // You store the materialized view
- Materialization materialization = new Materialization(tablesUsed);
- materialization.setValidTxnList(validTxnList);
- cq.put(tableName, materialization);
- } else {
- ValidTxnWriteIdList txnList = new ValidTxnWriteIdList(validTxnList);
- for (String qNameTableUsed : tablesUsed) {
- ValidWriteIdList tableTxnList = txnList.getTableValidWriteIdList(qNameTableUsed);
- // First we insert a new tree set to keep table modifications, unless it already exists
- ConcurrentSkipListMap<Long, Long> modificationsTree = new ConcurrentSkipListMap<>();
- final ConcurrentSkipListMap<Long, Long> prevModificationsTree = tableModifications.putIfAbsent(
- qNameTableUsed, modificationsTree);
- if (prevModificationsTree != null) {
- modificationsTree = prevModificationsTree;
- }
- // If we are not creating the MV at this instant, but instead it was created previously
- // and we are loading it into the cache, we need to go through the transaction entries and
- // check if the MV is still valid.
- try {
- String[] names = qNameTableUsed.split("\\.");
- BasicTxnInfo e = handler.getTxnHandler().getFirstCompletedTransactionForTableAfterCommit(
- names[0], names[1], tableTxnList);
- if (!e.isIsnull()) {
- modificationsTree.put(e.getTxnid(), e.getTime());
- // We do not need to do anything more for current table, as we detected
- // a modification event that was in the metastore.
- continue;
- }
- } catch (MetaException ex) {
- LOG.debug("Materialized view " + Warehouse.getQualifiedName(dbName, tableName) +
- " ignored; error loading view into invalidation cache", ex);
- return;
- }
- }
- // For LOAD, you only add it if it does exist as you might be loading an outdated MV
- Materialization materialization = new Materialization(tablesUsed);
- materialization.setValidTxnList(validTxnList);
- cq.putIfAbsent(tableName, materialization);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cached materialized view for rewriting in invalidation cache: " +
- Warehouse.getQualifiedName(dbName, tableName));
- }
- }
-
- /**
- * This method is called when a table is modified. That way we can keep track of the
- * invalidation for the MVs that use that table.
- */
- public void notifyTableModification(String dbName, String tableName,
- long txnId, long newModificationTime, boolean isUpdateDelete) {
- if (disable) {
- // Nothing to do
- return;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Notification for table {} in database {} received -> id: {}, time: {}",
- tableName, dbName, txnId, newModificationTime);
- }
- if (isUpdateDelete) {
- // We update first the update/delete modifications record
- ConcurrentSkipListSet<Long> modificationsSet = new ConcurrentSkipListSet<>();
- final ConcurrentSkipListSet<Long> prevModificationsSet =
- updateDeleteTableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName),
- modificationsSet);
- if (prevModificationsSet != null) {
- modificationsSet = prevModificationsSet;
- }
- modificationsSet.add(txnId);
- }
- ConcurrentSkipListMap<Long, Long> modificationsTree = new ConcurrentSkipListMap<>();
- final ConcurrentSkipListMap<Long, Long> prevModificationsTree =
- tableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName), modificationsTree);
- if (prevModificationsTree != null) {
- modificationsTree = prevModificationsTree;
- }
- modificationsTree.put(txnId, newModificationTime);
- }
-
- /**
- * Removes the materialized view from the cache.
- *
- * @param dbName
- * @param tableName
- */
- public void dropMaterializedView(String dbName, String tableName) {
- if (disable) {
- // Nothing to do
- return;
- }
- materializations.get(dbName).remove(tableName);
- }
-
- /**
- * Returns the materialized views in the cache for the given database.
- *
- * @param dbName the database
- * @return the collection of materialized views, or the empty collection if none
- */
- public Map<String, Materialization> getMaterializationInvalidationInfo(
- String dbName, List<String> materializationNames) {
- if (materializations.get(dbName) != null) {
- ImmutableMap.Builder<String, Materialization> m = ImmutableMap.builder();
- for (String materializationName : materializationNames) {
- Materialization materialization =
- materializations.get(dbName).get(materializationName);
- if (materialization == null) {
- LOG.debug("Materialization {} skipped as there is no information "
- + "in the invalidation cache about it", materializationName);
- continue;
- }
- // We create a deep copy of the materialization, as we need to set the time
- // and whether any update/delete operation happen on the tables that it uses
- // since it was created.
- Materialization materializationCopy = new Materialization(
- materialization.getTablesUsed());
- materializationCopy.setValidTxnList(materialization.getValidTxnList());
- enrichWithInvalidationInfo(materializationCopy);
- m.put(materializationName, materializationCopy);
- }
- Map<String, Materialization> result = m.build();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Retrieved the following materializations from the invalidation cache: {}", result);
- }
- return result;
- }
- return ImmutableMap.of();
- }
-
- private void enrichWithInvalidationInfo(Materialization materialization) {
- String materializationTxnListString = materialization.getValidTxnList();
- if (materializationTxnListString == null) {
- // This can happen when the materialization was created on non-transactional tables
- materialization.setInvalidationTime(Long.MIN_VALUE);
- return;
- }
-
- // We will obtain the modification time as follows.
- // First, we obtain the first element after high watermark (if any)
- // Then, we iterate through the elements from min open txn till high
- // watermark, updating the modification time after creation if needed
- ValidTxnWriteIdList materializationTxnList = new ValidTxnWriteIdList(materializationTxnListString);
- long firstModificationTimeAfterCreation = 0L;
- boolean containsUpdateDelete = false;
- for (String qNameTableUsed : materialization.getTablesUsed()) {
- final ValidWriteIdList tableMaterializationTxnList =
- materializationTxnList.getTableValidWriteIdList(qNameTableUsed);
-
- final ConcurrentSkipListMap<Long, Long> usedTableModifications =
- tableModifications.get(qNameTableUsed);
- if (usedTableModifications == null) {
- // This is not necessarily an error, since the table may be empty. To be safe,
- // instead of including this materialized view, we just log the information and
- // skip it (if table is really empty, it will not matter for performance anyway).
- LOG.warn("No information found in invalidation cache for table {}, possible tables are: {}",
- qNameTableUsed, tableModifications.keySet());
- materialization.setInvalidationTime(Long.MIN_VALUE);
- return;
- }
- final ConcurrentSkipListSet<Long> usedUDTableModifications =
- updateDeleteTableModifications.get(qNameTableUsed);
- final Entry<Long, Long> tn = usedTableModifications.higherEntry(tableMaterializationTxnList.getHighWatermark());
- if (tn != null) {
- if (firstModificationTimeAfterCreation == 0L ||
- tn.getValue() < firstModificationTimeAfterCreation) {
- firstModificationTimeAfterCreation = tn.getValue();
- }
- // Check if there was any update/delete after creation
- containsUpdateDelete = usedUDTableModifications != null &&
- !usedUDTableModifications.tailSet(tableMaterializationTxnList.getHighWatermark(), false).isEmpty();
- }
- // Min open txn might be null if there were no open transactions
- // when this transaction was being executed
- if (tableMaterializationTxnList.getMinOpenWriteId() != null) {
- // Invalid transaction list is sorted
- int pos = 0;
- for (Map.Entry<Long, Long> t : usedTableModifications
- .subMap(tableMaterializationTxnList.getMinOpenWriteId(), tableMaterializationTxnList.getHighWatermark()).entrySet()) {
- while (pos < tableMaterializationTxnList.getInvalidWriteIds().length &&
- tableMaterializationTxnList.getInvalidWriteIds()[pos] != t.getKey()) {
- pos++;
- }
- if (pos >= tableMaterializationTxnList.getInvalidWriteIds().length) {
- break;
- }
- if (firstModificationTimeAfterCreation == 0L ||
- t.getValue() < firstModificationTimeAfterCreation) {
- firstModificationTimeAfterCreation = t.getValue();
- }
- containsUpdateDelete = containsUpdateDelete ||
- (usedUDTableModifications != null && usedUDTableModifications.contains(t.getKey()));
- }
- }
- }
-
- materialization.setInvalidationTime(firstModificationTimeAfterCreation);
- materialization.setSourceTablesUpdateDeleteModified(containsUpdateDelete);
- }
-
- private enum OpType {
- CREATE,
- LOAD,
- ALTER
- }
-
- /**
- * Removes transaction events that are not relevant anymore.
- * @param minTime events generated before this time (ms) can be deleted from the cache
- * @return number of events that were deleted from the cache
- */
- public long cleanup(long minTime) {
- // To remove, mv should meet two conditions:
- // 1) Current time - time of transaction > config parameter, and
- // 2) Transaction should not be associated with invalidation of a MV
- if (disable || !initialized) {
- // Bail out
- return 0L;
- }
- // We execute the cleanup in two steps
- // First we gather all the transactions that need to be kept
- final Multimap<String, Long> keepTxnInfos = HashMultimap.create();
- for (Map.Entry<String, ConcurrentMap<String, Materialization>> e : materializations.entrySet()) {
- for (Materialization m : e.getValue().values()) {
- ValidTxnWriteIdList txnList = new ValidTxnWriteIdList(m.getValidTxnList());
- boolean canBeDeleted = false;
- String currentTableForInvalidatingTxn = null;
- long currentInvalidatingTxnId = 0L;
- long currentInvalidatingTxnTime = 0L;
- for (String qNameTableUsed : m.getTablesUsed()) {
- ValidWriteIdList tableTxnList = txnList.getTableValidWriteIdList(qNameTableUsed);
- final Entry<Long, Long> tn = tableModifications.get(qNameTableUsed)
- .higherEntry(tableTxnList.getHighWatermark());
- if (tn != null) {
- if (currentInvalidatingTxnTime == 0L ||
- tn.getValue() < currentInvalidatingTxnTime) {
- // This transaction 1) is the first one examined for this materialization, or
- // 2) it is the invalidating transaction. Hence we add it to the transactions to keep.
- // 1.- We remove the previous invalidating transaction from the transactions
- // to be kept (if needed).
- if (canBeDeleted && currentInvalidatingTxnTime < minTime) {
- keepTxnInfos.remove(currentTableForInvalidatingTxn, currentInvalidatingTxnId);
- }
- // 2.- We add this transaction to the transactions that should be kept.
- canBeDeleted = !keepTxnInfos.get(qNameTableUsed).contains(tn.getKey());
- keepTxnInfos.put(qNameTableUsed, tn.getKey());
- // 3.- We record this transaction as the current invalidating transaction.
- currentTableForInvalidatingTxn = qNameTableUsed;
- currentInvalidatingTxnId = tn.getKey();
- currentInvalidatingTxnTime = tn.getValue();
- }
- }
- if (tableTxnList.getMinOpenWriteId() != null) {
- // Invalid transaction list is sorted
- int pos = 0;
- for (Entry<Long, Long> t : tableModifications.get(qNameTableUsed)
- .subMap(tableTxnList.getMinOpenWriteId(), tableTxnList.getHighWatermark()).entrySet()) {
- while (pos < tableTxnList.getInvalidWriteIds().length &&
- tableTxnList.getInvalidWriteIds()[pos] != t.getKey()) {
- pos++;
- }
- if (pos >= tableTxnList.getInvalidWriteIds().length) {
- break;
- }
- if (currentInvalidatingTxnTime == 0L ||
- t.getValue() < currentInvalidatingTxnTime) {
- // This transaction 1) is the first one examined for this materialization, or
- // 2) it is the invalidating transaction. Hence we add it to the transactions to keep.
- // 1.- We remove the previous invalidating transaction from the transactions
- // to be kept (if needed).
- if (canBeDeleted && currentInvalidatingTxnTime < minTime) {
- keepTxnInfos.remove(currentTableForInvalidatingTxn, currentInvalidatingTxnId);
- }
- // 2.- We add this transaction to the transactions that should be kept.
- canBeDeleted = !keepTxnInfos.get(qNameTableUsed).contains(t.getKey());
- keepTxnInfos.put(qNameTableUsed, t.getKey());
- // 3.- We record this transaction as the current invalidating transaction.
- currentTableForInvalidatingTxn = qNameTableUsed;
- currentInvalidatingTxnId = t.getKey();
- currentInvalidatingTxnTime = t.getValue();
- }
- }
- }
- }
- }
- }
- // Second, we remove the transactions
- long removed = 0L;
- for (Entry<String, ConcurrentSkipListMap<Long, Long>> e : tableModifications.entrySet()) {
- Collection<Long> c = keepTxnInfos.get(e.getKey());
- ConcurrentSkipListSet<Long> updateDeleteForTable = updateDeleteTableModifications.get(e.getKey());
- for (Iterator<Entry<Long, Long>> it = e.getValue().entrySet().iterator(); it.hasNext();) {
- Entry<Long, Long> v = it.next();
- // We need to check again the time because some of the transactions might not be explored
- // above, e.g., transactions above the highest transaction mark for all the materialized
- // views.
- if (v.getValue() < minTime && (c.isEmpty() || !c.contains(v.getKey()))) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Transaction removed from cache for table {} -> id: {}, time: {}",
- e.getKey(), v.getKey(), v.getValue());
- }
- if (updateDeleteForTable != null) {
- updateDeleteForTable.remove(v.getKey());
- }
- it.remove();
- removed++;
- }
- }
- }
- return removed;
- }
-
- /**
- * Checks whether the given materialization exists in the invalidation cache.
- * @param dbName the database name for the materialization
- * @param tblName the table name for the materialization
- * @return true if we have information about the materialization in the cache,
- * false otherwise
- */
- public boolean containsMaterialization(String dbName, String tblName) {
- if (disable || dbName == null || tblName == null) {
- return false;
- }
- ConcurrentMap<String, Materialization> dbMaterializations = materializations.get(dbName);
- if (dbMaterializations == null || dbMaterializations.get(tblName) == null) {
- // This is a table
- return false;
- }
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
index 8ca9ede..9ce7d6d 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
@@ -18,7 +18,10 @@
package org.apache.hadoop.hive.metastore;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,6 +35,7 @@ public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThre
private static final Logger LOG = LoggerFactory.getLogger(MaterializationsRebuildLockCleanerTask.class);
private Configuration conf;
+ private TxnStore txnHandler;
@Override
public long runFrequency(TimeUnit unit) {
@@ -41,6 +45,7 @@ public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThre
@Override
public void setConf(Configuration configuration) {
conf = configuration;
+ txnHandler = TxnUtils.getTxnStore(conf);
}
@Override
@@ -50,11 +55,26 @@ public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThre
@Override
public void run() {
- long removedCnt = MaterializationsRebuildLockHandler.get().cleanupResourceLocks(
- MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS));
- if (removedCnt > 0) {
- if (LOG.isDebugEnabled()) {
- LOG.info("Number of materialization locks deleted: " + removedCnt);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cleaning up materialization rebuild locks");
+ }
+
+ TxnStore.MutexAPI.LockHandle handle = null;
+ try {
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.MaterializationRebuild.name());
+ ValidTxnList validTxnList = TxnUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0);
+ long removedCnt = txnHandler.cleanupMaterializationRebuildLocks(validTxnList,
+ MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS));
+ if (removedCnt > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Number of materialization locks deleted: " + removedCnt);
+ }
+ }
+ } catch(Throwable t) {
+ LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
+ } finally {
+ if(handle != null) {
+ handle.releaseLocks();
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index 8721022..bdcbf41 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -1335,13 +1335,6 @@ public class ObjectStore implements RawStore, Configurable {
} finally {
if (!commited) {
rollbackTransaction();
- } else {
- if (MetaStoreUtils.isMaterializedViewTable(tbl)) {
- // Add to the invalidation cache
- MaterializationsInvalidationCache.get().createMaterializedView(
- tbl.getDbName(), tbl.getTableName(), tbl.getCreationMetadata().getTablesUsed(),
- tbl.getCreationMetadata().getValidTxnList());
- }
}
}
}
@@ -1439,10 +1432,6 @@ public class ObjectStore implements RawStore, Configurable {
} finally {
if (!success) {
rollbackTransaction();
- } else {
- if (materializedView) {
- MaterializationsInvalidationCache.get().dropMaterializedView(dbName, tableName);
- }
}
}
return success;
@@ -2285,13 +2274,14 @@ public class ObjectStore implements RawStore, Configurable {
if (m == null) {
return null;
}
+ assert !m.isSetMaterializationTime();
Set<MTable> tablesUsed = new HashSet<>();
for (String fullyQualifiedName : m.getTablesUsed()) {
String[] names = fullyQualifiedName.split("\\.");
tablesUsed.add(getMTable(m.getCatName(), names[0], names[1], false).mtbl);
}
return new MCreationMetadata(m.getCatName(), m.getDbName(), m.getTblName(),
- tablesUsed, m.getValidTxnList());
+ tablesUsed, m.getValidTxnList(), System.currentTimeMillis());
}
private CreationMetadata convertToCreationMetadata(
@@ -2307,6 +2297,7 @@ public class ObjectStore implements RawStore, Configurable {
}
CreationMetadata r = new CreationMetadata(s.getCatalogName(),
s.getDbName(), s.getTblName(), tablesUsed);
+ r.setMaterializationTime(s.getMaterializationTime());
if (s.getTxnList() != null) {
r.setValidTxnList(s.getTxnList());
}
@@ -4210,16 +4201,13 @@ public class ObjectStore implements RawStore, Configurable {
MCreationMetadata newMcm = convertToMCreationMetadata(cm);
MCreationMetadata mcm = getCreationMetadata(catName, dbname, tablename);
mcm.setTables(newMcm.getTables());
+ mcm.setMaterializationTime(newMcm.getMaterializationTime());
mcm.setTxnList(newMcm.getTxnList());
// commit the changes
success = commitTransaction();
} finally {
if (!success) {
rollbackTransaction();
- } else {
- // Add to the invalidation cache if the creation signature has changed
- MaterializationsInvalidationCache.get().alterMaterializedView(
- dbname, tablename, cm.getTablesUsed(), cm.getValidTxnList());
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 74a301f..c2bbba5 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.DefaultStorageSchemaReader;
import org.apache.hadoop.hive.metastore.HiveAlterHandler;
-import org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask;
import org.apache.hadoop.hive.metastore.MaterializationsRebuildLockCleanerTask;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.RuntimeStatsCleanerTask;
@@ -762,8 +761,6 @@ public class MetastoreConf {
TASK_THREADS_ALWAYS("metastore.task.threads.always", "metastore.task.threads.always",
EventCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName() + "," +
"org.apache.hadoop.hive.metastore.repl.DumpDirCleanerTask" + "," +
- MaterializationsCacheCleanerTask.class.getName() + "," +
- MaterializationsRebuildLockCleanerTask.class.getName() + "," + RuntimeStatsCleanerTask.class.getName() + "," +
"org.apache.hadoop.hive.metastore.HiveProtoEventsCleanerTask",
"Comma separated list of tasks that will be started in separate threads. These will " +
"always be started, regardless of whether the metastore is running in embedded mode " +
@@ -772,7 +769,8 @@ public class MetastoreConf {
AcidHouseKeeperService.class.getName() + "," +
AcidOpenTxnsCounterService.class.getName() + "," +
AcidCompactionHistoryService.class.getName() + "," +
- AcidWriteSetService.class.getName(),
+ AcidWriteSetService.class.getName() + "," +
+ MaterializationsRebuildLockCleanerTask.class.getName(),
"Command separated list of tasks that will be started in separate threads. These will be" +
" started only when the metastore is running as a separate service. They must " +
"implement " + MetastoreTaskThread.class.getName()),
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java
index 66b5d48..2d65126 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/model/MCreationMetadata.java
@@ -22,8 +22,8 @@ import java.util.Set;
/**
* Represents the creation metadata of a materialization.
* It includes the database and table name for the materialization,
- * the set of tables that it uses, and the valid transaction list
- * when it was created.
+ * the set of tables that it uses, the valid transaction list
+ * when it was created, and the creation/rebuild time.
*/
public class MCreationMetadata {
@@ -32,17 +32,19 @@ public class MCreationMetadata {
private String tblName;
private Set<MTable> tables;
private String txnList;
+ private long materializationTime;
public MCreationMetadata() {
}
public MCreationMetadata(String catName, String dbName, String tblName,
- Set<MTable> tables, String txnList) {
+ Set<MTable> tables, String txnList, long materializationTime) {
this.catalogName = catName;
this.dbName = dbName;
this.tblName = tblName;
this.tables = tables;
this.txnList = txnList;
+ this.materializationTime = materializationTime;
}
public Set<MTable> getTables() {
@@ -84,4 +86,12 @@ public class MCreationMetadata {
public void setTblName(String tblName) {
this.tblName = tblName;
}
+
+ public long getMaterializationTime() {
+ return materializationTime;
+ }
+
+ public void setMaterializationTime(long materializationTime) {
+ this.materializationTime = materializationTime;
+ }
}
http://git-wip-us.apache.org/repos/asf/hive/blob/1b5903b0/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index f8c2ca2..2bae133 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -94,9 +94,9 @@ public final class TxnDbUtil {
" CTC_DATABASE varchar(128) NOT NULL," +
" CTC_TABLE varchar(128)," +
" CTC_PARTITION varchar(767)," +
- " CTC_ID bigint GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) NOT NULL," +
" CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL," +
- " CTC_WRITEID bigint)");
+ " CTC_WRITEID bigint," +
+ " CTC_UPDATE_DELETE char(1) NOT NULL)");
stmt.execute("CREATE TABLE NEXT_TXN_ID (" + " NTXN_NEXT bigint NOT NULL)");
stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
@@ -194,6 +194,14 @@ public final class TxnDbUtil {
" PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID))"
);
+ stmt.execute("CREATE TABLE MATERIALIZATION_REBUILD_LOCKS (" +
+ " MRL_TXN_ID BIGINT NOT NULL, " +
+ " MRL_DB_NAME VARCHAR(128) NOT NULL, " +
+ " MRL_TBL_NAME VARCHAR(256) NOT NULL, " +
+ " MRL_LAST_HEARTBEAT BIGINT NOT NULL, " +
+ " PRIMARY KEY(MRL_TXN_ID))"
+ );
+
try {
stmt.execute("CREATE TABLE \"APP\".\"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\" VARCHAR(256) NOT " +
@@ -336,6 +344,7 @@ public final class TxnDbUtil {
success &= dropTable(stmt, "AUX_TABLE", retryCount);
success &= dropTable(stmt, "WRITE_SET", retryCount);
success &= dropTable(stmt, "REPL_TXN_MAP", retryCount);
+ success &= dropTable(stmt, "MATERIALIZATION_REBUILD_LOCKS", retryCount);
/*
* Don't drop NOTIFICATION_LOG, SEQUENCE_TABLE and NOTIFICATION_SEQUENCE as its used by other
* table which are not txn related to generate primary key. So if these tables are dropped