You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2018/01/11 21:54:42 UTC
[02/22] hive git commit: HIVE-14498: Freshness period for query
rewriting using materialized views (Jesus Camacho Rodriguez,
reviewed by Ashutosh Chauhan)
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index 46be4fb..afcec9e 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -809,6 +809,7 @@ class Table
PRIVILEGES = 13
TEMPORARY = 14
REWRITEENABLED = 15
+ CREATIONMETADATA = 16
FIELDS = {
TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'},
@@ -825,7 +826,8 @@ class Table
TABLETYPE => {:type => ::Thrift::Types::STRING, :name => 'tableType'},
PRIVILEGES => {:type => ::Thrift::Types::STRUCT, :name => 'privileges', :class => ::PrincipalPrivilegeSet, :optional => true},
TEMPORARY => {:type => ::Thrift::Types::BOOL, :name => 'temporary', :default => false, :optional => true},
- REWRITEENABLED => {:type => ::Thrift::Types::BOOL, :name => 'rewriteEnabled', :optional => true}
+ REWRITEENABLED => {:type => ::Thrift::Types::BOOL, :name => 'rewriteEnabled', :optional => true},
+ CREATIONMETADATA => {:type => ::Thrift::Types::MAP, :name => 'creationMetadata', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRUCT, :class => ::BasicTxnInfo}, :optional => true}
}
def struct_fields; FIELDS; end
@@ -2668,6 +2670,55 @@ class AddDynamicPartitions
::Thrift::Struct.generate_accessors self
end
+class BasicTxnInfo
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ ISNULL = 1
+ ID = 2
+ TIME = 3
+ TXNID = 4
+ DBNAME = 5
+ TABLENAME = 6
+ PARTITIONNAME = 7
+
+ FIELDS = {
+ ISNULL => {:type => ::Thrift::Types::BOOL, :name => 'isnull'},
+ ID => {:type => ::Thrift::Types::I64, :name => 'id', :optional => true},
+ TIME => {:type => ::Thrift::Types::I64, :name => 'time', :optional => true},
+ TXNID => {:type => ::Thrift::Types::I64, :name => 'txnid', :optional => true},
+ DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname', :optional => true},
+ TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tablename', :optional => true},
+ PARTITIONNAME => {:type => ::Thrift::Types::STRING, :name => 'partitionname', :optional => true}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field isnull is unset!') if @isnull.nil?
+ end
+
+ ::Thrift::Struct.generate_accessors self
+end
+
+class TxnsSnapshot
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ TXN_HIGH_WATER_MARK = 1
+ OPEN_TXNS = 2
+
+ FIELDS = {
+ TXN_HIGH_WATER_MARK => {:type => ::Thrift::Types::I64, :name => 'txn_high_water_mark'},
+ OPEN_TXNS => {:type => ::Thrift::Types::LIST, :name => 'open_txns', :element => {:type => ::Thrift::Types::I64}}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field txn_high_water_mark is unset!') unless @txn_high_water_mark
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field open_txns is unset!') unless @open_txns
+ end
+
+ ::Thrift::Struct.generate_accessors self
+end
+
class NotificationEventRequest
include ::Thrift::Struct, ::Thrift::Struct_Union
LASTEVENT = 1
@@ -3260,6 +3311,29 @@ class TableMeta
::Thrift::Struct.generate_accessors self
end
+class Materialization
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ MATERIALIZATIONTABLE = 1
+ TABLESUSED = 2
+ INVALIDATIONTIME = 3
+
+ FIELDS = {
+ MATERIALIZATIONTABLE => {:type => ::Thrift::Types::STRUCT, :name => 'materializationTable', :class => ::Table},
+ TABLESUSED => {:type => ::Thrift::Types::SET, :name => 'tablesUsed', :element => {:type => ::Thrift::Types::STRING}},
+ INVALIDATIONTIME => {:type => ::Thrift::Types::I64, :name => 'invalidationTime'}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field materializationTable is unset!') unless @materializationTable
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tablesUsed is unset!') unless @tablesUsed
+ raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field invalidationTime is unset!') unless @invalidationTime
+ end
+
+ ::Thrift::Struct.generate_accessors self
+end
+
class WMResourcePlan
include ::Thrift::Struct, ::Thrift::Struct_Union
NAME = 1
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
index 182cc37..a788c08 100644
--- a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
+++ b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb
@@ -495,6 +495,22 @@ module ThriftHiveMetastore
raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_tables_by_type failed: unknown result')
end
+ def get_materialized_views_for_rewriting(db_name)
+ send_get_materialized_views_for_rewriting(db_name)
+ return recv_get_materialized_views_for_rewriting()
+ end
+
+ def send_get_materialized_views_for_rewriting(db_name)
+ send_message('get_materialized_views_for_rewriting', Get_materialized_views_for_rewriting_args, :db_name => db_name)
+ end
+
+ def recv_get_materialized_views_for_rewriting()
+ result = receive_message(Get_materialized_views_for_rewriting_result)
+ return result.success unless result.success.nil?
+ raise result.o1 unless result.o1.nil?
+ raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_materialized_views_for_rewriting failed: unknown result')
+ end
+
def get_table_meta(db_patterns, tbl_patterns, tbl_types)
send_get_table_meta(db_patterns, tbl_patterns, tbl_types)
return recv_get_table_meta()
@@ -594,6 +610,24 @@ module ThriftHiveMetastore
raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_table_objects_by_name_req failed: unknown result')
end
+ def get_materialization_invalidation_info(dbname, tbl_names)
+ send_get_materialization_invalidation_info(dbname, tbl_names)
+ return recv_get_materialization_invalidation_info()
+ end
+
+ def send_get_materialization_invalidation_info(dbname, tbl_names)
+ send_message('get_materialization_invalidation_info', Get_materialization_invalidation_info_args, :dbname => dbname, :tbl_names => tbl_names)
+ end
+
+ def recv_get_materialization_invalidation_info()
+ result = receive_message(Get_materialization_invalidation_info_result)
+ return result.success unless result.success.nil?
+ raise result.o1 unless result.o1.nil?
+ raise result.o2 unless result.o2.nil?
+ raise result.o3 unless result.o3.nil?
+ raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_materialization_invalidation_info failed: unknown result')
+ end
+
def get_table_names_by_filter(dbname, filter, max_tables)
send_get_table_names_by_filter(dbname, filter, max_tables)
return recv_get_table_names_by_filter()
@@ -2512,6 +2546,36 @@ module ThriftHiveMetastore
return
end
+ def get_last_completed_transaction_for_tables(db_names, table_names, txns_snapshot)
+ send_get_last_completed_transaction_for_tables(db_names, table_names, txns_snapshot)
+ return recv_get_last_completed_transaction_for_tables()
+ end
+
+ def send_get_last_completed_transaction_for_tables(db_names, table_names, txns_snapshot)
+ send_message('get_last_completed_transaction_for_tables', Get_last_completed_transaction_for_tables_args, :db_names => db_names, :table_names => table_names, :txns_snapshot => txns_snapshot)
+ end
+
+ def recv_get_last_completed_transaction_for_tables()
+ result = receive_message(Get_last_completed_transaction_for_tables_result)
+ return result.success unless result.success.nil?
+ raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_last_completed_transaction_for_tables failed: unknown result')
+ end
+
+ def get_last_completed_transaction_for_table(db_name, table_name, txns_snapshot)
+ send_get_last_completed_transaction_for_table(db_name, table_name, txns_snapshot)
+ return recv_get_last_completed_transaction_for_table()
+ end
+
+ def send_get_last_completed_transaction_for_table(db_name, table_name, txns_snapshot)
+ send_message('get_last_completed_transaction_for_table', Get_last_completed_transaction_for_table_args, :db_name => db_name, :table_name => table_name, :txns_snapshot => txns_snapshot)
+ end
+
+ def recv_get_last_completed_transaction_for_table()
+ result = receive_message(Get_last_completed_transaction_for_table_result)
+ return result.success unless result.success.nil?
+ raise ::Thrift::ApplicationException.new(::Thrift::ApplicationException::MISSING_RESULT, 'get_last_completed_transaction_for_table failed: unknown result')
+ end
+
def get_next_notification(rqst)
send_get_next_notification(rqst)
return recv_get_next_notification()
@@ -3389,6 +3453,17 @@ module ThriftHiveMetastore
write_result(result, oprot, 'get_tables_by_type', seqid)
end
+ def process_get_materialized_views_for_rewriting(seqid, iprot, oprot)
+ args = read_args(iprot, Get_materialized_views_for_rewriting_args)
+ result = Get_materialized_views_for_rewriting_result.new()
+ begin
+ result.success = @handler.get_materialized_views_for_rewriting(args.db_name)
+ rescue ::MetaException => o1
+ result.o1 = o1
+ end
+ write_result(result, oprot, 'get_materialized_views_for_rewriting', seqid)
+ end
+
def process_get_table_meta(seqid, iprot, oprot)
args = read_args(iprot, Get_table_meta_args)
result = Get_table_meta_result.new()
@@ -3459,6 +3534,21 @@ module ThriftHiveMetastore
write_result(result, oprot, 'get_table_objects_by_name_req', seqid)
end
+ def process_get_materialization_invalidation_info(seqid, iprot, oprot)
+ args = read_args(iprot, Get_materialization_invalidation_info_args)
+ result = Get_materialization_invalidation_info_result.new()
+ begin
+ result.success = @handler.get_materialization_invalidation_info(args.dbname, args.tbl_names)
+ rescue ::MetaException => o1
+ result.o1 = o1
+ rescue ::InvalidOperationException => o2
+ result.o2 = o2
+ rescue ::UnknownDBException => o3
+ result.o3 = o3
+ end
+ write_result(result, oprot, 'get_materialization_invalidation_info', seqid)
+ end
+
def process_get_table_names_by_filter(seqid, iprot, oprot)
args = read_args(iprot, Get_table_names_by_filter_args)
result = Get_table_names_by_filter_result.new()
@@ -4896,6 +4986,20 @@ module ThriftHiveMetastore
write_result(result, oprot, 'add_dynamic_partitions', seqid)
end
+ def process_get_last_completed_transaction_for_tables(seqid, iprot, oprot)
+ args = read_args(iprot, Get_last_completed_transaction_for_tables_args)
+ result = Get_last_completed_transaction_for_tables_result.new()
+ result.success = @handler.get_last_completed_transaction_for_tables(args.db_names, args.table_names, args.txns_snapshot)
+ write_result(result, oprot, 'get_last_completed_transaction_for_tables', seqid)
+ end
+
+ def process_get_last_completed_transaction_for_table(seqid, iprot, oprot)
+ args = read_args(iprot, Get_last_completed_transaction_for_table_args)
+ result = Get_last_completed_transaction_for_table_result.new()
+ result.success = @handler.get_last_completed_transaction_for_table(args.db_name, args.table_name, args.txns_snapshot)
+ write_result(result, oprot, 'get_last_completed_transaction_for_table', seqid)
+ end
+
def process_get_next_notification(seqid, iprot, oprot)
args = read_args(iprot, Get_next_notification_args)
result = Get_next_notification_result.new()
@@ -6316,6 +6420,40 @@ module ThriftHiveMetastore
::Thrift::Struct.generate_accessors self
end
+ class Get_materialized_views_for_rewriting_args
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ DB_NAME = 1
+
+ FIELDS = {
+ DB_NAME => {:type => ::Thrift::Types::STRING, :name => 'db_name'}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
+ class Get_materialized_views_for_rewriting_result
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ SUCCESS = 0
+ O1 = 1
+
+ FIELDS = {
+ SUCCESS => {:type => ::Thrift::Types::LIST, :name => 'success', :element => {:type => ::Thrift::Types::STRING}},
+ O1 => {:type => ::Thrift::Types::STRUCT, :name => 'o1', :class => ::MetaException}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
class Get_table_meta_args
include ::Thrift::Struct, ::Thrift::Struct_Union
DB_PATTERNS = 1
@@ -6534,6 +6672,46 @@ module ThriftHiveMetastore
::Thrift::Struct.generate_accessors self
end
+ class Get_materialization_invalidation_info_args
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ DBNAME = 1
+ TBL_NAMES = 2
+
+ FIELDS = {
+ DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'},
+ TBL_NAMES => {:type => ::Thrift::Types::LIST, :name => 'tbl_names', :element => {:type => ::Thrift::Types::STRING}}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
+ class Get_materialization_invalidation_info_result
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ SUCCESS = 0
+ O1 = 1
+ O2 = 2
+ O3 = 3
+
+ FIELDS = {
+ SUCCESS => {:type => ::Thrift::Types::MAP, :name => 'success', :key => {:type => ::Thrift::Types::STRING}, :value => {:type => ::Thrift::Types::STRUCT, :class => ::Materialization}},
+ O1 => {:type => ::Thrift::Types::STRUCT, :name => 'o1', :class => ::MetaException},
+ O2 => {:type => ::Thrift::Types::STRUCT, :name => 'o2', :class => ::InvalidOperationException},
+ O3 => {:type => ::Thrift::Types::STRUCT, :name => 'o3', :class => ::UnknownDBException}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
class Get_table_names_by_filter_args
include ::Thrift::Struct, ::Thrift::Struct_Union
DBNAME = 1
@@ -10910,6 +11088,78 @@ module ThriftHiveMetastore
::Thrift::Struct.generate_accessors self
end
+ class Get_last_completed_transaction_for_tables_args
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ DB_NAMES = 1
+ TABLE_NAMES = 2
+ TXNS_SNAPSHOT = 3
+
+ FIELDS = {
+ DB_NAMES => {:type => ::Thrift::Types::LIST, :name => 'db_names', :element => {:type => ::Thrift::Types::STRING}},
+ TABLE_NAMES => {:type => ::Thrift::Types::LIST, :name => 'table_names', :element => {:type => ::Thrift::Types::STRING}},
+ TXNS_SNAPSHOT => {:type => ::Thrift::Types::STRUCT, :name => 'txns_snapshot', :class => ::TxnsSnapshot}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
+ class Get_last_completed_transaction_for_tables_result
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ SUCCESS = 0
+
+ FIELDS = {
+ SUCCESS => {:type => ::Thrift::Types::LIST, :name => 'success', :element => {:type => ::Thrift::Types::STRUCT, :class => ::BasicTxnInfo}}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
+ class Get_last_completed_transaction_for_table_args
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ DB_NAME = 1
+ TABLE_NAME = 2
+ TXNS_SNAPSHOT = 3
+
+ FIELDS = {
+ DB_NAME => {:type => ::Thrift::Types::STRING, :name => 'db_name'},
+ TABLE_NAME => {:type => ::Thrift::Types::STRING, :name => 'table_name'},
+ TXNS_SNAPSHOT => {:type => ::Thrift::Types::STRUCT, :name => 'txns_snapshot', :class => ::TxnsSnapshot}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
+ class Get_last_completed_transaction_for_table_result
+ include ::Thrift::Struct, ::Thrift::Struct_Union
+ SUCCESS = 0
+
+ FIELDS = {
+ SUCCESS => {:type => ::Thrift::Types::STRUCT, :name => 'success', :class => ::BasicTxnInfo}
+ }
+
+ def struct_fields; FIELDS; end
+
+ def validate
+ end
+
+ ::Thrift::Struct.generate_accessors self
+ end
+
class Get_next_notification_args
include ::Thrift::Struct, ::Thrift::Struct_Union
RQST = 1
@@ -10975,7 +11225,7 @@ module ThriftHiveMetastore
class Get_notification_events_count_args
include ::Thrift::Struct, ::Thrift::Struct_Union
- RQST = -1
+ RQST = 1
FIELDS = {
RQST => {:type => ::Thrift::Types::STRUCT, :name => 'rqst', :class => ::NotificationEventsCountRequest}
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index a1eeb29..0683440 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -652,7 +652,8 @@ public class HiveMetaStore extends ThriftHiveMetastore {
return ms;
}
- private TxnStore getTxnHandler() {
+ @Override
+ public TxnStore getTxnHandler() {
TxnStore txn = threadLocalTxn.get();
if (txn == null) {
txn = TxnUtils.getTxnStore(conf);
@@ -1499,6 +1500,7 @@ public class HiveMetaStore extends ThriftHiveMetastore {
tbl.getParameters().get(hive_metastoreConstants.DDL_TIME) == null) {
tbl.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
}
+
if (primaryKeys == null && foreignKeys == null
&& uniqueConstraints == null && notNullConstraints == null) {
ms.createTable(tbl);
@@ -2511,6 +2513,11 @@ public class HiveMetaStore extends ThriftHiveMetastore {
return tables;
}
+ @Override
+ public Map<String, Materialization> get_materialization_invalidation_info(final String dbName, final List<String> tableNames) {
+ return MaterializationsInvalidationCache.get().getMaterializationInvalidationInfo(dbName, tableNames);
+ }
+
private void assertClientHasCapability(ClientCapabilities client,
ClientCapability value, String what, String call) throws MetaException {
if (!doesClientHaveCapability(client, value)) {
@@ -4451,6 +4458,28 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
@Override
+ public List<String> get_materialized_views_for_rewriting(final String dbname)
+ throws MetaException {
+ startFunction("get_materialized_views_for_rewriting", ": db=" + dbname);
+
+ List<String> ret = null;
+ Exception ex = null;
+ try {
+ ret = getMS().getMaterializedViewsForRewriting(dbname);
+ } catch (Exception e) {
+ ex = e;
+ if (e instanceof MetaException) {
+ throw (MetaException) e;
+ } else {
+ throw newMetaException(e);
+ }
+ } finally {
+ endFunction("get_materialized_views_for_rewriting", ret != null, ex);
+ }
+ return ret;
+ }
+
+ @Override
public List<String> get_all_tables(final String dbname) throws MetaException {
startFunction("get_all_tables", ": db=" + dbname);
@@ -6922,6 +6951,19 @@ public class HiveMetaStore extends ThriftHiveMetastore {
}
@Override
+ public List<BasicTxnInfo> get_last_completed_transaction_for_tables(
+ List<String> dbNames, List<String> tableNames, TxnsSnapshot txnsSnapshot)
+ throws TException {
+ return getTxnHandler().getLastCompletedTransactionForTables(dbNames, tableNames, txnsSnapshot);
+ }
+
+ @Override
+ public BasicTxnInfo get_last_completed_transaction_for_table(String dbName, String tableName, TxnsSnapshot txnsSnapshot)
+ throws TException {
+ return getTxnHandler().getLastCompletedTransactionForTable(dbName, tableName, txnsSnapshot);
+ }
+
+ @Override
public NotificationEventsCountResponse get_notification_events_count(NotificationEventsCountRequest rqst)
throws TException {
try {
@@ -7803,6 +7845,10 @@ public class HiveMetaStore extends ThriftHiveMetastore {
HMSHandler baseHandler = new HiveMetaStore.HMSHandler("new db based metaserver", conf,
false);
IHMSHandler handler = newRetryingHMSHandler(baseHandler, conf);
+
+ // Initialize materializations invalidation cache
+ MaterializationsInvalidationCache.get().init(handler.getMS(), handler.getTxnHandler());
+
TServerSocket serverSocket;
if (useSasl) {
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
index 16d08b1..53a8669 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
@@ -30,6 +30,7 @@ import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -44,23 +45,21 @@ import java.util.NoSuchElementException;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.security.PrivilegedExceptionAction;
import javax.security.auth.login.LoginException;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.ObjectPair;
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
@@ -159,6 +158,8 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
// instantiate the metastore server handler directly instead of connecting
// through the network
client = HiveMetaStore.newRetryingHMSHandler("hive client", this.conf, true);
+ // Initialize materializations invalidation cache (only for local metastore)
+ MaterializationsInvalidationCache.get().init(((IHMSHandler) client).getMS(), ((IHMSHandler) client).getTxnHandler());
isConnected = true;
snapshotActiveConf();
return;
@@ -1398,6 +1399,14 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
/** {@inheritDoc} */
@Override
+ public Map<String, Materialization> getMaterializationsInvalidationInfo(String dbName, List<String> viewNames)
+ throws MetaException, InvalidOperationException, UnknownDBException, TException {
+ return client.get_materialization_invalidation_info(
+ dbName, filterHook.filterTableNames(dbName, viewNames));
+ }
+
+ /** {@inheritDoc} */
+ @Override
public List<String> listTableNamesByFilter(String dbName, String filter, short maxTables)
throws MetaException, TException, InvalidOperationException, UnknownDBException {
return filterHook.filterTableNames(dbName,
@@ -1431,7 +1440,19 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
@Override
public List<String> getTables(String dbname, String tablePattern, TableType tableType) throws MetaException {
try {
- return filterHook.filterTableNames(dbname, client.get_tables_by_type(dbname, tablePattern, tableType.toString()));
+ return filterHook.filterTableNames(dbname,
+ client.get_tables_by_type(dbname, tablePattern, tableType.toString()));
+ } catch (Exception e) {
+ MetaStoreUtils.logAndThrowMetaException(e);
+ }
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public List<String> getMaterializedViewsForRewriting(String dbname) throws MetaException {
+ try {
+ return filterHook.filterTableNames(dbname, client.get_materialized_views_for_rewriting(dbname));
} catch (Exception e) {
MetaStoreUtils.logAndThrowMetaException(e);
}
@@ -2145,6 +2166,25 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
}
@Override
+ public List<BasicTxnInfo> getLastCompletedTransactionForTables(
+ List<String> dbNames, List<String> tableNames, ValidTxnList txnList)
+ throws TException {
+ TxnsSnapshot txnsSnapshot = new TxnsSnapshot();
+ txnsSnapshot.setTxn_high_water_mark(txnList.getHighWatermark());
+ txnsSnapshot.setOpen_txns(Arrays.asList(ArrayUtils.toObject(txnList.getInvalidTransactions())));
+ return client.get_last_completed_transaction_for_tables(dbNames, tableNames, txnsSnapshot);
+ }
+
+ @Override
+ public BasicTxnInfo getLastCompletedTransactionForTable(String dbName, String tableName, ValidTxnList txnList)
+ throws TException {
+ TxnsSnapshot txnsSnapshot = new TxnsSnapshot();
+ txnsSnapshot.setTxn_high_water_mark(txnList.getHighWatermark());
+ txnsSnapshot.setOpen_txns(Arrays.asList(ArrayUtils.toObject(txnList.getInvalidTransactions())));
+ return client.get_last_completed_transaction_for_table(dbName, tableName, txnsSnapshot);
+ }
+
+ @Override
public ValidTxnList getValidTxns() throws TException {
return TxnUtils.createValidReadTxnList(client.get_open_txns(), 0);
}
@@ -2749,4 +2789,5 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable {
request.setDrop(shouldDrop);
client.create_or_drop_wm_trigger_to_pool_mapping(request);
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IHMSHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IHMSHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IHMSHandler.java
index 85bdc4d..e6de001 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IHMSHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IHMSHandler.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.metastore;
+import java.util.List;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -25,8 +27,7 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
-
-import java.util.List;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
/**
* An interface wrapper for HMSHandler. This interface contains methods that need to be
@@ -51,6 +52,12 @@ public interface IHMSHandler extends ThriftHiveMetastore.Iface, Configurable {
RawStore getMS() throws MetaException;
/**
+ * Get a reference to the underlying TxnStore.
+ * @return the TxnStore instance.
+ */
+ TxnStore getTxnHandler();
+
+ /**
* Get a reference to Hive's warehouse object (the class that does all the physical operations).
* @return Warehouse instance.
*/
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
index 4d68217..3261405 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.hive.metastore;
-import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
@@ -35,6 +33,7 @@ import org.apache.hadoop.hive.common.classification.RetrySemantics;
import org.apache.hadoop.hive.metastore.annotation.NoReconnect;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
import org.apache.hadoop.hive.metastore.api.CmRecycleRequest;
import org.apache.hadoop.hive.metastore.api.CmRecycleResponse;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
@@ -67,6 +66,7 @@ import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.InvalidPartitionException;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.Materialization;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.MetadataPpdResult;
import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
@@ -75,8 +75,8 @@ import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
import org.apache.hadoop.hive.metastore.api.NotificationEvent;
import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
-import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.PartitionEventType;
@@ -86,8 +86,6 @@ import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
-import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
-import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.metastore.api.Role;
import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
@@ -105,8 +103,11 @@ import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMMapping;
import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMTrigger;
import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
import org.apache.hadoop.hive.metastore.utils.ObjectPair;
import org.apache.thrift.TException;
@@ -203,6 +204,17 @@ public interface IMetaStoreClient {
throws MetaException, TException, UnknownDBException;
/**
+ * Get materialized views that have rewriting enabled.
+ * @param dbName Name of the database to fetch materialized views from.
+ * @return List of materialized view names.
+ * @throws MetaException
+ * @throws TException
+ * @throws UnknownDBException
+ */
+ List<String> getMaterializedViewsForRewriting(String dbName)
+ throws MetaException, TException, UnknownDBException;
+
+ /**
* For quick GetTablesOperation
*/
List<TableMeta> getTableMeta(String dbPatterns, String tablePatterns, List<String> tableTypes)
@@ -429,6 +441,12 @@ public interface IMetaStoreClient {
throws MetaException, InvalidOperationException, UnknownDBException, TException;
/**
+ * Returns the invalidation information for the materialized views given as input.
+ */
+ Map<String, Materialization> getMaterializationsInvalidationInfo(String dbName, List<String> viewNames)
+ throws MetaException, InvalidOperationException, UnknownDBException, TException;
+
+ /**
* @param tableName
* @param dbName
* @param partVals
@@ -1317,6 +1335,24 @@ public interface IMetaStoreClient {
throws MetaException, TException;
/**
+ * Get the last completed transaction for the given tables. Although transactions in Hive
+ * might happen concurrently, the order is based on the actual commit to the metastore
+ * table holding the completed transactions.
+ */
+ @InterfaceAudience.LimitedPrivate({"HCatalog"})
+ List<BasicTxnInfo> getLastCompletedTransactionForTables(List<String> dbNames, List<String> tableNames, ValidTxnList txnList)
+ throws TException;
+
+ /**
+ * Get the last completed transaction for the given table. Although transactions in Hive
+ * might happen concurrently, the order is based on the actual commit to the metastore
+ * table holding the completed transactions.
+ */
+ @InterfaceAudience.LimitedPrivate({"HCatalog"})
+ BasicTxnInfo getLastCompletedTransactionForTable(String dbName, String tableName, ValidTxnList txnList)
+ throws TException;
+
+ /**
* Get a structure that details valid transactions.
* @return list of valid transactions
* @throws TException
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java
new file mode 100644
index 0000000..5e7ee40
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationInvalidationInfo.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.hive.metastore.api.Materialization;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+/**
+ * Contains information about the invalidation of a materialization,
+ * including the materialization name, the tables that it uses, and
+ * the invalidation time, i.e., the first moment t0 after the
+ * materialization was created at which one of the tables that it uses
+ * was modified.
+ */
+@SuppressWarnings("serial")
+public class MaterializationInvalidationInfo extends Materialization {
+
+ private AtomicLong invalidationTime;
+
+ public MaterializationInvalidationInfo(Table materializationTable, Set<String> tablesUsed) {
+ super(materializationTable, tablesUsed, 0);
+ this.invalidationTime = new AtomicLong(0);
+ }
+
+ public boolean compareAndSetInvalidationTime(long expect, long update) {
+ boolean success = invalidationTime.compareAndSet(expect, update);
+ if (success) {
+ super.setInvalidationTime(update);
+ }
+ return success;
+ }
+
+ public long getInvalidationTime() {
+ return invalidationTime.get();
+ }
+
+ public void setInvalidationTime(long invalidationTime) {
+ throw new UnsupportedOperationException("You should call compareAndSetInvalidationTime instead");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
new file mode 100644
index 0000000..3e1bb1e
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsInvalidationCache.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
+import org.apache.hadoop.hive.metastore.api.Materialization;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+/**
+ * This cache keeps information in memory about the table modifications so materialized views
+ * can verify their invalidation time, i.e., the moment after materialization on which the
+ * first transaction to the tables they used happened. This information is kept in memory
+ * to check the invalidation quickly. However, we store enough information in the metastore
+ * to bring this cache up if the metastore is restarted or would crashed. This cache lives
+ * in the metastore server.
+ */
+public final class MaterializationsInvalidationCache {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MaterializationsInvalidationCache.class);
+
+ /* Singleton */
+ private static final MaterializationsInvalidationCache SINGLETON = new MaterializationsInvalidationCache();
+
+ /* Key is the database name. Each value is a map from the unique view qualified name to
+ * the materialization invalidation info. This invalidation object contains information
+ * such as the tables used by the materialized view or the invalidation time, i.e., first
+ * modification of the tables used by materialized view after the view was created. */
+ private final ConcurrentMap<String, ConcurrentMap<String, MaterializationInvalidationInfo>> materializations =
+ new ConcurrentHashMap<String, ConcurrentMap<String, MaterializationInvalidationInfo>>();
+
+ /*
+ * Key is a qualified table name. The value is a (sorted) tree set (supporting concurrent
+ * modifications) that will keep the modifications for a given table in the order that they
+ * happen. This is useful to quickly check the invalidation time for a given materialized
+ * view.
+ */
+ private final ConcurrentMap<String, ConcurrentSkipListSet<TableModificationKey>> tableModifications =
+ new ConcurrentHashMap<String, ConcurrentSkipListSet<TableModificationKey>>();
+
+ /* Whether the cache has been initialized or not. */
+ private boolean initialized;
+ /* Store to answer calls not related to transactions. */
+ private RawStore store;
+ /* Store to answer calls related to transactions. */
+ private TxnStore txnStore;
+
+ private MaterializationsInvalidationCache() {
+ }
+
+ /**
+ * Get instance of MaterializationsInvalidationCache.
+ *
+ * @return the singleton
+ */
+ public static MaterializationsInvalidationCache get() {
+ return SINGLETON;
+ }
+
+ /**
+ * Initialize the invalidation cache.
+ *
+ * The method is synchronized because we want to avoid initializing the invalidation cache
+ * multiple times in embedded mode. This will not happen when we run the metastore remotely
+ * as the method is called only once.
+ */
+ public synchronized void init(final RawStore store, final TxnStore txnStore) {
+ this.store = store;
+ this.txnStore = txnStore;
+
+ if (!initialized) {
+ this.initialized = true;
+ ExecutorService pool = Executors.newCachedThreadPool();
+ pool.submit(new Loader());
+ pool.shutdown();
+ }
+ }
+
+ private class Loader implements Runnable {
+ @Override
+ public void run() {
+ try {
+ for (String dbName : store.getAllDatabases()) {
+ for (Table mv : store.getTableObjectsByName(dbName, store.getTables(dbName, null, TableType.MATERIALIZED_VIEW))) {
+ addMaterializedView(mv, ImmutableSet.copyOf(mv.getCreationMetadata().keySet()), OpType.LOAD);
+ }
+ }
+ LOG.info("Initialized materializations invalidation cache");
+ } catch (Exception e) {
+ LOG.error("Problem connecting to the metastore when initializing the view registry");
+ }
+ }
+ }
+
+ /**
+ * Adds a newly created materialized view to the cache.
+ *
+ * @param materializedViewTable the materialized view
+ * @param tablesUsed tables used by the materialized view
+ */
+ public void createMaterializedView(Table materializedViewTable, Set<String> tablesUsed) {
+ addMaterializedView(materializedViewTable, tablesUsed, OpType.CREATE);
+ }
+
+ /**
+ * Method to call when materialized view is modified.
+ *
+ * @param materializedViewTable the materialized view
+ * @param tablesUsed tables used by the materialized view
+ */
+ public void alterMaterializedView(Table materializedViewTable, Set<String> tablesUsed) {
+ addMaterializedView(materializedViewTable, tablesUsed, OpType.ALTER);
+ }
+
+ /**
+ * Adds the materialized view to the cache.
+ *
+ * @param materializedViewTable the materialized view
+ * @param tablesUsed tables used by the materialized view
+ */
+ private void addMaterializedView(Table materializedViewTable, Set<String> tablesUsed, OpType opType) {
+ // We are going to create the map for each view in the given database
+ ConcurrentMap<String, MaterializationInvalidationInfo> cq =
+ new ConcurrentHashMap<String, MaterializationInvalidationInfo>();
+ final ConcurrentMap<String, MaterializationInvalidationInfo> prevCq = materializations.putIfAbsent(
+ materializedViewTable.getDbName(), cq);
+ if (prevCq != null) {
+ cq = prevCq;
+ }
+ // Start the process to add materialization to the cache
+ // Before loading the materialization in the cache, we need to update some
+ // important information in the registry to account for rewriting invalidation
+ for (String qNameTableUsed : tablesUsed) {
+ // First we insert a new tree set to keep table modifications, unless it already exists
+ ConcurrentSkipListSet<TableModificationKey> modificationsTree =
+ new ConcurrentSkipListSet<TableModificationKey>();
+ final ConcurrentSkipListSet<TableModificationKey> prevModificationsTree = tableModifications.putIfAbsent(
+ qNameTableUsed, modificationsTree);
+ if (prevModificationsTree != null) {
+ modificationsTree = prevModificationsTree;
+ }
+ // We obtain the access time to the table when the materialized view was created.
+ // This is a map from table fully qualified name to last modification before MV creation.
+ BasicTxnInfo e = materializedViewTable.getCreationMetadata().get(qNameTableUsed);
+ if (e.isIsnull()) {
+ // This can happen when the materialized view was created on non-transactional tables
+ // with rewrite disabled but then it was enabled by alter statement
+ continue;
+ }
+ final TableModificationKey lastModificationBeforeCreation =
+ new TableModificationKey(e.getId(), e.getTime());
+ modificationsTree.add(lastModificationBeforeCreation);
+ if (opType == OpType.LOAD) {
+ // If we are not creating the MV at this instant, but instead it was created previously
+ // and we are loading it into the cache, we need to go through the transaction logs and
+ // check if the MV is still valid.
+ try {
+ String[] names = qNameTableUsed.split("\\.");
+ BasicTxnInfo e2 = txnStore.getFirstCompletedTransactionForTableAfterCommit(
+ names[0], names[1], lastModificationBeforeCreation.id);
+ if (!e2.isIsnull()) {
+ modificationsTree.add(new TableModificationKey(e2.getId(), e2.getTime()));
+ // We do not need to do anything more for current table, as we detected
+ // a modification event that was in the metastore.
+ continue;
+ }
+ } catch (MetaException ex) {
+ LOG.debug("Materialized view " +
+ Warehouse.getQualifiedName(materializedViewTable.getDbName(), materializedViewTable.getTableName()) +
+ " ignored; error loading view into invalidation cache", ex);
+ return;
+ }
+ }
+ }
+ if (opType == OpType.CREATE || opType == OpType.ALTER) {
+ // You store the materialized view
+ cq.put(materializedViewTable.getTableName(),
+ new MaterializationInvalidationInfo(materializedViewTable, tablesUsed));
+ } else {
+ // For LOAD, you only add it if it does exist as you might be loading an outdated MV
+ cq.putIfAbsent(materializedViewTable.getTableName(),
+ new MaterializationInvalidationInfo(materializedViewTable, tablesUsed));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cached materialized view for rewriting in invalidation cache: " +
+ Warehouse.getQualifiedName(materializedViewTable.getDbName(), materializedViewTable.getTableName()));
+ }
+ }
+
+ /**
+ * This method is called when a table is modified. That way we can keep a track of the
+ * invalidation for the MVs that use that table.
+ */
+ public void notifyTableModification(String dbName, String tableName,
+ long eventId, long newModificationTime) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Notification for table {} in database {} received -> id: {}, time: {}",
+ tableName, dbName, eventId, newModificationTime);
+ }
+ ConcurrentSkipListSet<TableModificationKey> modificationsTree =
+ new ConcurrentSkipListSet<TableModificationKey>();
+ final ConcurrentSkipListSet<TableModificationKey> prevModificationsTree =
+ tableModifications.putIfAbsent(Warehouse.getQualifiedName(dbName, tableName), modificationsTree);
+ if (prevModificationsTree != null) {
+ modificationsTree = prevModificationsTree;
+ }
+ modificationsTree.add(new TableModificationKey(eventId, newModificationTime));
+ }
+
+ /**
+ * Removes the materialized view from the cache.
+ *
+ * @param materializedViewTable the materialized view to remove
+ */
+ public void dropMaterializedView(Table materializedViewTable) {
+ dropMaterializedView(materializedViewTable.getDbName(), materializedViewTable.getTableName());
+ }
+
+ public void dropMaterializedView(String dbName, String tableName) {
+ materializations.get(dbName).remove(tableName);
+ }
+
+ /**
+ * Returns the materialized views in the cache for the given database.
+ *
+ * @param dbName the database
+ * @return the collection of materialized views, or the empty collection if none
+ */
+ public Map<String, Materialization> getMaterializationInvalidationInfo(
+ String dbName, List<String> materializationNames) {
+ if (materializations.get(dbName) != null) {
+ ImmutableMap.Builder<String, Materialization> m = ImmutableMap.builder();
+ for (String materializationName : materializationNames) {
+ MaterializationInvalidationInfo materialization =
+ materializations.get(dbName).get(materializationName);
+ if (materialization == null) {
+ LOG.debug("Materialization {} skipped as there is no information "
+ + "in the invalidation cache about it", materializationName);
+ continue;
+ }
+ long invalidationTime = getInvalidationTime(materialization);
+ // We need to check whether previous value is zero, as data modification
+ // in another table used by the materialized view might have modified
+ // the value too
+ boolean modified = materialization.compareAndSetInvalidationTime(0L, invalidationTime);
+ while (!modified) {
+ long currentInvalidationTime = materialization.getInvalidationTime();
+ if (invalidationTime < currentInvalidationTime) {
+ // It was set by other table modification, but it was after this table modification
+ // hence we need to set it
+ modified = materialization.compareAndSetInvalidationTime(currentInvalidationTime, invalidationTime);
+ } else {
+ // Nothing to do
+ modified = true;
+ }
+ }
+ m.put(materializationName, materialization);
+ }
+ Map<String, Materialization> result = m.build();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retrieved the following materializations from the invalidation cache: {}", result);
+ }
+ return result;
+ }
+ return ImmutableMap.of();
+ }
+
+ private long getInvalidationTime(MaterializationInvalidationInfo materialization) {
+ long firstModificationTimeAfterCreation = 0L;
+ for (String qNameTableUsed : materialization.getTablesUsed()) {
+ BasicTxnInfo e = materialization.getMaterializationTable().getCreationMetadata().get(qNameTableUsed);
+ if (e == null) {
+ // This can happen when the materialized view was created on non-transactional tables
+ // with rewrite disabled but then it was enabled by alter statement
+ return Long.MIN_VALUE;
+ }
+ final TableModificationKey lastModificationBeforeCreation =
+ new TableModificationKey(e.getId(), e.getTime());
+ final TableModificationKey post = tableModifications.get(qNameTableUsed)
+ .higher(lastModificationBeforeCreation);
+ if (post != null) {
+ if (firstModificationTimeAfterCreation == 0L ||
+ post.time < firstModificationTimeAfterCreation) {
+ firstModificationTimeAfterCreation = post.time;
+ }
+ }
+ }
+ return firstModificationTimeAfterCreation;
+ }
+
+ private static class TableModificationKey implements Comparable<TableModificationKey> {
+ private long id;
+ private long time;
+
+ private TableModificationKey(long id, long time) {
+ this.id = id;
+ this.time = time;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if(this == obj) {
+ return true;
+ }
+ if((obj == null) || (obj.getClass() != this.getClass())) {
+ return false;
+ }
+ TableModificationKey tableModificationKey = (TableModificationKey) obj;
+ return id == tableModificationKey.id && time == tableModificationKey.time;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 7;
+ hash = 31 * hash + Long.hashCode(id);
+ hash = 31 * hash + Long.hashCode(time);
+ return hash;
+ }
+
+ @Override
+ public int compareTo(TableModificationKey other) {
+ if (id == other.id) {
+ return Long.compare(time, other.time);
+ }
+ return Long.compare(id, other.id);
+ }
+
+ @Override
+ public String toString() {
+ return "TableModificationKey{" + id + "," + time + "}";
+ }
+ }
+
+ private enum OpType {
+ CREATE,
+ LOAD,
+ ALTER
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
index 14653b4..786732f 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
@@ -42,8 +42,8 @@ import javax.jdo.Query;
import javax.jdo.Transaction;
import javax.jdo.datastore.JDOConnection;
-import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.AggregateStatsCache.AggrColStats;
import org.apache.hadoop.hive.metastore.api.AggrStats;
@@ -416,6 +416,36 @@ class MetaStoreDirectSql {
}
/**
+ * Get table names by using direct SQL queries.
+ *
+ * @param dbName Metastore database namme
+ * @param tableType Table type, or null if we want to get all tables
+ * @return list of table names
+ */
+ public List<String> getMaterializedViewsForRewriting(String db_name) throws MetaException {
+ List<String> ret = new ArrayList<String>();
+ String queryText = "SELECT " + TBLS + ".\"TBL_NAME\""
+ + " FROM " + TBLS + " "
+ + " INNER JOIN " + DBS + " ON " + TBLS + ".\"DB_ID\" = " + DBS + ".\"DB_ID\" "
+ + " WHERE " + DBS + ".\"NAME\" = ? AND " + TBLS + ".\"TBL_TYPE\" = ? " ;
+
+ List<String> pms = new ArrayList<String>();
+ pms.add(db_name);
+ pms.add(TableType.MATERIALIZED_VIEW.toString());
+
+ Query<?> queryParams = pm.newQuery("javax.jdo.query.SQL", queryText);
+ List<Object[]> sqlResult = ensureList(executeWithArray(
+ queryParams, pms.toArray(), queryText));
+
+ if (!sqlResult.isEmpty()) {
+ for (Object[] line : sqlResult) {
+ ret.add(extractSqlString(line[0]));
+ }
+ }
+ return ret;
+ }
+
+ /**
* Gets partitions by using direct SQL queries.
* Note that batching is not needed for this method - list of names implies the batch size;
* @param dbName Metastore db name.
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index c9ff295..5438b22 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -20,14 +20,7 @@ package org.apache.hadoop.hive.metastore;
import static org.apache.commons.lang.StringUtils.join;
import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
-import org.apache.hadoop.hive.metastore.api.WMMapping;
-import org.apache.hadoop.hive.metastore.model.MWMMapping;
-import org.apache.hadoop.hive.metastore.model.MWMMapping.EntityType;
-import org.apache.hadoop.hive.metastore.api.WMPool;
-import org.apache.hadoop.hive.metastore.model.MWMPool;
-import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
@@ -58,6 +51,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
+
import javax.jdo.JDOCanRetryException;
import javax.jdo.JDODataStoreException;
import javax.jdo.JDOException;
@@ -71,7 +65,9 @@ import javax.jdo.datastore.DataStoreCache;
import javax.jdo.datastore.JDOConnection;
import javax.jdo.identity.IntIdentity;
import javax.sql.DataSource;
+
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -83,6 +79,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.metastore.MetaStoreDirectSql.SqlFilterForPushdown;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -133,6 +130,10 @@ import org.apache.hadoop.hive.metastore.api.Type;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
+import org.apache.hadoop.hive.metastore.api.WMMapping;
+import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WMPoolTrigger;
import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
import org.apache.hadoop.hive.metastore.api.WMResourcePlanStatus;
import org.apache.hadoop.hive.metastore.api.WMTrigger;
@@ -173,6 +174,9 @@ import org.apache.hadoop.hive.metastore.model.MTableColumnStatistics;
import org.apache.hadoop.hive.metastore.model.MTablePrivilege;
import org.apache.hadoop.hive.metastore.model.MType;
import org.apache.hadoop.hive.metastore.model.MVersionTable;
+import org.apache.hadoop.hive.metastore.model.MWMMapping;
+import org.apache.hadoop.hive.metastore.model.MWMMapping.EntityType;
+import org.apache.hadoop.hive.metastore.model.MWMPool;
import org.apache.hadoop.hive.metastore.model.MWMResourcePlan;
import org.apache.hadoop.hive.metastore.model.MWMResourcePlan.Status;
import org.apache.hadoop.hive.metastore.model.MWMTrigger;
@@ -184,7 +188,10 @@ import org.apache.hadoop.hive.metastore.utils.FileUtils;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.ObjectPair;
+import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TJSONProtocol;
import org.datanucleus.AbstractNucleusContext;
import org.datanucleus.ClassLoaderResolver;
import org.datanucleus.ClassLoaderResolverImpl;
@@ -196,11 +203,13 @@ import org.datanucleus.store.scostore.Store;
import org.datanucleus.util.WeakValueMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
/**
@@ -1109,6 +1118,7 @@ public class ObjectStore implements RawStore, Configurable {
boolean commited = false;
try {
openTransaction();
+
MTable mtbl = convertToMTable(tbl);
pm.makePersistent(mtbl);
@@ -1131,6 +1141,12 @@ public class ObjectStore implements RawStore, Configurable {
} finally {
if (!commited) {
rollbackTransaction();
+ } else {
+ if (MetaStoreUtils.isMaterializedViewTable(tbl)) {
+ // Add to the invalidation cache
+ MaterializationsInvalidationCache.get().createMaterializedView(
+ tbl, tbl.getCreationMetadata().keySet());
+ }
}
}
}
@@ -1171,12 +1187,14 @@ public class ObjectStore implements RawStore, Configurable {
@Override
public boolean dropTable(String dbName, String tableName) throws MetaException,
NoSuchObjectException, InvalidObjectException, InvalidInputException {
+ boolean materializedView = false;
boolean success = false;
try {
openTransaction();
MTable tbl = getMTable(dbName, tableName);
pm.retrieve(tbl);
if (tbl != null) {
+ materializedView = TableType.MATERIALIZED_VIEW.toString().equals(tbl.getTableType());
// first remove all the grants
List<MTablePrivilege> tabGrants = listAllTableGrants(dbName, tableName);
if (CollectionUtils.isNotEmpty(tabGrants)) {
@@ -1220,6 +1238,10 @@ public class ObjectStore implements RawStore, Configurable {
} finally {
if (!success) {
rollbackTransaction();
+ } else {
+ if (materializedView) {
+ MaterializationsInvalidationCache.get().dropMaterializedView(dbName, tableName);
+ }
}
}
return success;
@@ -1353,6 +1375,30 @@ public class ObjectStore implements RawStore, Configurable {
}
@Override
+ public List<String> getMaterializedViewsForRewriting(String dbName)
+ throws MetaException, NoSuchObjectException {
+ final String db_name = normalizeIdentifier(dbName);
+ boolean commited = false;
+ Query<?> query = null;
+ List<String> tbls = null;
+ try {
+ openTransaction();
+ dbName = normalizeIdentifier(dbName);
+ query = pm.newQuery(MTable.class, "database.name == db && tableType == tt"
+ + " && rewriteEnabled == re");
+ query.declareParameters("java.lang.String db, java.lang.String tt, boolean re");
+ query.setResult("tableName");
+ Collection<String> names = (Collection<String>) query.execute(
+ db_name, TableType.MATERIALIZED_VIEW.toString(), true);
+ tbls = new ArrayList<>(names);
+ commited = commitTransaction();
+ } finally {
+ rollbackAndCleanup(commited, query);
+ }
+ return tbls;
+ }
+
+ @Override
public int getDatabaseCount() throws MetaException {
return getObjectCount("name", MDatabase.class.getName());
}
@@ -1585,6 +1631,7 @@ public class ObjectStore implements RawStore, Configurable {
.getRetention(), convertToStorageDescriptor(mtbl.getSd()),
convertToFieldSchemas(mtbl.getPartitionKeys()), convertMap(mtbl.getParameters()),
mtbl.getViewOriginalText(), mtbl.getViewExpandedText(), tableType);
+ t.setCreationMetadata(convertToCreationMetadata(mtbl.getCreationMetadata()));
t.setRewriteEnabled(mtbl.isRewriteEnabled());
return t;
}
@@ -1624,7 +1671,7 @@ public class ObjectStore implements RawStore, Configurable {
.getCreateTime(), tbl.getLastAccessTime(), tbl.getRetention(),
convertToMFieldSchemas(tbl.getPartitionKeys()), tbl.getParameters(),
tbl.getViewOriginalText(), tbl.getViewExpandedText(), tbl.isRewriteEnabled(),
- tableType);
+ convertToMCreationMetadata(tbl.getCreationMetadata()), tableType);
}
private List<MFieldSchema> convertToMFieldSchemas(List<FieldSchema> keys) {
@@ -1833,6 +1880,58 @@ public class ObjectStore implements RawStore, Configurable {
.getSkewedColValueLocationMaps()), sd.isStoredAsSubDirectories());
}
+ private Map<String, String> convertToMCreationMetadata(
+ Map<String, BasicTxnInfo> m) throws MetaException {
+ if (m == null) {
+ return null;
+ }
+ Map<String, String> r = new HashMap<>();
+ for (Entry<String, BasicTxnInfo> e : m.entrySet()) {
+ r.put(e.getKey(), serializeBasicTransactionInfo(e.getValue()));
+ }
+ return r;
+ }
+
+ private Map<String, BasicTxnInfo> convertToCreationMetadata(
+ Map<String, String> m) throws MetaException {
+ if (m == null) {
+ return null;
+ }
+ Map<String, BasicTxnInfo> r = new HashMap<>();
+ for (Entry<String, String> e : m.entrySet()) {
+ r.put(e.getKey(), deserializeBasicTransactionInfo(e.getValue()));
+ }
+ return r;
+ }
+
+ private String serializeBasicTransactionInfo(BasicTxnInfo entry)
+ throws MetaException {
+ if (entry.isIsnull()) {
+ return "";
+ }
+ try {
+ TSerializer serializer = new TSerializer(new TJSONProtocol.Factory());
+ return serializer.toString(entry, "UTF-8");
+ } catch (TException e) {
+ throw new MetaException("Error serializing object " + entry + ": " + e.toString());
+ }
+ }
+
+ private BasicTxnInfo deserializeBasicTransactionInfo(String s)
+ throws MetaException {
+ if (s.equals("")) {
+ return new BasicTxnInfo(true);
+ }
+ try {
+ TDeserializer deserializer = new TDeserializer(new TJSONProtocol.Factory());
+ BasicTxnInfo r = new BasicTxnInfo();
+ deserializer.deserialize(r, s, "UTF-8");
+ return r;
+ } catch (TException e) {
+ throw new MetaException("Error deserializing object " + s + ": " + e.toString());
+ }
+ }
+
@Override
public boolean addPartitions(String dbName, String tblName, List<Partition> parts)
throws InvalidObjectException, MetaException {
@@ -3563,6 +3662,7 @@ public class ObjectStore implements RawStore, Configurable {
public void alterTable(String dbname, String name, Table newTable)
throws InvalidObjectException, MetaException {
boolean success = false;
+ boolean registerCreationSignature = false;
try {
openTransaction();
name = normalizeIdentifier(name);
@@ -3598,12 +3698,23 @@ public class ObjectStore implements RawStore, Configurable {
oldt.setViewOriginalText(newt.getViewOriginalText());
oldt.setViewExpandedText(newt.getViewExpandedText());
oldt.setRewriteEnabled(newt.isRewriteEnabled());
+ registerCreationSignature = !MapUtils.isEmpty(newt.getCreationMetadata());
+ if (registerCreationSignature) {
+ oldt.setCreationMetadata(newt.getCreationMetadata());
+ }
// commit the changes
success = commitTransaction();
} finally {
if (!success) {
rollbackTransaction();
+ } else {
+ if (MetaStoreUtils.isMaterializedViewTable(newTable) &&
+ registerCreationSignature) {
+ // Add to the invalidation cache if the creation signature has changed
+ MaterializationsInvalidationCache.get().alterMaterializedView(
+ newTable, newTable.getCreationMetadata().keySet());
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
index fa77f63..e6c7a58 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java
@@ -175,6 +175,9 @@ public interface RawStore extends Configurable {
List<String> getTables(String dbName, String pattern, TableType tableType)
throws MetaException;
+ List<String> getMaterializedViewsForRewriting(String dbName)
+ throws MetaException, NoSuchObjectException;
+
List<TableMeta> getTableMeta(
String dbNames, String tableNames, List<String> tableTypes) throws MetaException;
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
index 421d1be..2d52e0e 100755
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/Warehouse.java
@@ -181,7 +181,11 @@ public class Warehouse {
}
public static String getQualifiedName(Table table) {
- return table.getDbName() + "." + table.getTableName();
+ return getQualifiedName(table.getDbName(), table.getTableName());
+ }
+
+ public static String getQualifiedName(String dbName, String tableName) {
+ return dbName + "." + tableName;
}
public static String getQualifiedName(Partition partition) {
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index e1be6b9..72c4a26 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -1082,6 +1082,12 @@ public class CachedStore implements RawStore, Configurable {
}
@Override
+ public List<String> getMaterializedViewsForRewriting(String dbName)
+ throws MetaException, NoSuchObjectException {
+ return rawStore.getMaterializedViewsForRewriting(dbName);
+ }
+
+ @Override
public List<TableMeta> getTableMeta(String dbNames, String tableNames, List<String> tableTypes)
throws MetaException {
// TODO Check if all required tables are allowed, if so, get it from cache
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
index 3759348..9c34928 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTable.java
@@ -35,6 +35,7 @@ public class MTable {
private String viewOriginalText;
private String viewExpandedText;
private boolean rewriteEnabled;
+ private Map<String, String> creationMetadata;
private String tableType;
public MTable() {}
@@ -55,8 +56,9 @@ public class MTable {
*/
public MTable(String tableName, MDatabase database, MStorageDescriptor sd, String owner,
int createTime, int lastAccessTime, int retention, List<MFieldSchema> partitionKeys,
- Map<String, String> parameters,
- String viewOriginalText, String viewExpandedText, boolean rewriteEnabled, String tableType) {
+ Map<String, String> parameters, String viewOriginalText, String viewExpandedText,
+ boolean rewriteEnabled, Map<String, String> creationMetadata,
+ String tableType) {
this.tableName = tableName;
this.database = database;
this.sd = sd;
@@ -69,6 +71,7 @@ public class MTable {
this.viewOriginalText = viewOriginalText;
this.viewExpandedText = viewExpandedText;
this.rewriteEnabled = rewriteEnabled;
+ this.creationMetadata = creationMetadata;
this.tableType = tableType;
}
@@ -171,6 +174,20 @@ public class MTable {
}
/**
+ * @return the metadata information related to a materialized view creation
+ */
+ public Map<String, String> getCreationMetadata() {
+ return creationMetadata;
+ }
+
+ /**
+ * @param creationMetadata the metadata information to set
+ */
+ public void setCreationMetadata(Map<String, String> creationMetadata) {
+ this.creationMetadata = creationMetadata;
+ }
+
+ /**
* @return the owner
*/
public String getOwner() {
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 756cb4c..e724723 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -91,7 +91,9 @@ public final class TxnDbUtil {
" CTC_TXNID bigint," +
" CTC_DATABASE varchar(128) NOT NULL," +
" CTC_TABLE varchar(128)," +
- " CTC_PARTITION varchar(767))");
+ " CTC_PARTITION varchar(767)," +
+ " CTC_ID bigint GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) NOT NULL," +
+ " CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL)");
stmt.execute("CREATE TABLE NEXT_TXN_ID (" + " NTXN_NEXT bigint NOT NULL)");
stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
stmt.execute("CREATE TABLE HIVE_LOCKS (" +
http://git-wip-us.apache.org/repos/asf/hive/blob/57d909c3/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 06f49de..0c5b2ef 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -17,48 +17,109 @@
*/
package org.apache.hadoop.hive.metastore.txn;
-import com.google.common.annotations.VisibleForTesting;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TimeZone;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+
+import javax.sql.DataSource;
import org.apache.commons.dbcp.ConnectionFactory;
import org.apache.commons.dbcp.DriverManagerConnectionFactory;
import org.apache.commons.dbcp.PoolableConnectionFactory;
+import org.apache.commons.dbcp.PoolingDataSource;
import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.classification.RetrySemantics;
import org.apache.hadoop.hive.metastore.DatabaseProduct;
+import org.apache.hadoop.hive.metastore.MaterializationsInvalidationCache;
import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.BasicTxnInfo;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.TxnsSnapshot;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.datasource.BoneCPDataSourceProvider;
import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
import org.apache.hadoop.hive.metastore.datasource.HikariCPDataSourceProvider;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.apache.hadoop.hive.metastore.utils.StringableMap;
+import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.dbcp.PoolingDataSource;
-
-import org.apache.commons.pool.impl.GenericObjectPool;
-import org.apache.hadoop.hive.metastore.api.*;
-import org.apache.hadoop.util.StringUtils;
-import javax.sql.DataSource;
-
-import java.io.PrintWriter;
-import java.nio.ByteBuffer;
-import java.sql.*;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
+import com.google.common.annotations.VisibleForTesting;
/**
* A handler to answer transaction related calls that come into the metastore
@@ -755,7 +816,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
// Move the record from txn_components into completed_txn_components so that the compactor
// knows where to look to compact.
- String s = "insert into COMPLETED_TXN_COMPONENTS select tc_txnid, tc_database, tc_table, " +
+ String s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " +
+ "ctc_table, ctc_partition) select tc_txnid, tc_database, tc_table, " +
"tc_partition from TXN_COMPONENTS where tc_txnid = " + txnid;
LOG.debug("Going to execute insert <" + s + ">");
int modCount = 0;
@@ -776,6 +838,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
modCount = stmt.executeUpdate(s);
LOG.debug("Going to commit");
dbConn.commit();
+
+ // Update registry with modifications
+ s = "select ctc_database, ctc_table, ctc_id, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid;
+ rs = stmt.executeQuery(s);
+ if (rs.next()) {
+ LOG.debug("Going to register table modification in invalidation cache <" + s + ">");
+ MaterializationsInvalidationCache.get().notifyTableModification(
+ rs.getString(1), rs.getString(2), rs.getLong(3),
+ rs.getTimestamp(4, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime());
+ }
+ close(rs);
+ dbConn.commit();
} catch (SQLException e) {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
@@ -791,6 +865,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
commitTxn(rqst);
}
}
+
@Override
@RetrySemantics.SafeToRetry
public void performWriteSetGC() {
@@ -836,6 +911,105 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
close(rs, stmt, dbConn);
}
}
+
+ /**
+ * Gets the information of the last transaction committed for the input table
+ * given the transaction snapshot provided.
+ */
+ @Override
+ public List<BasicTxnInfo> getLastCompletedTransactionForTables(
+ List<String> dbNames, List<String> tableNames, TxnsSnapshot txnsSnapshot) throws MetaException {
+ List<BasicTxnInfo> r = new ArrayList<>();
+ for (int i = 0; i < dbNames.size(); i++) {
+ r.add(getLastCompletedTransactionForTable(dbNames.get(i), tableNames.get(i), txnsSnapshot));
+ }
+ return r;
+ }
+
+ /**
+ * Gets the information of the last transaction committed for the input table
+ * given the transaction snapshot provided.
+ */
+ @Override
+ public BasicTxnInfo getLastCompletedTransactionForTable(
+ String inputDbName, String inputTableName, TxnsSnapshot txnsSnapshot) throws MetaException {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ stmt.setMaxRows(1);
+ String s = "select ctc_id, ctc_timestamp, ctc_txnid, ctc_database, ctc_table "
+ + "from COMPLETED_TXN_COMPONENTS "
+ + "where ctc_database=" + quoteString(inputDbName) + " and ctc_table=" + quoteString(inputTableName)
+ + " and ctc_txnid <= " + txnsSnapshot.getTxn_high_water_mark()
+ + (txnsSnapshot.getOpen_txns().isEmpty() ?
+ " " : " and ctc_txnid NOT IN(" + StringUtils.join(",", txnsSnapshot.getOpen_txns()) + ") ")
+ + "order by ctc_id desc";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Going to execute query <" + s + ">");
+ }
+ rs = stmt.executeQuery(s);
+ if(!rs.next()) {
+ return new BasicTxnInfo(true);
+ }
+ final BasicTxnInfo txnInfo = new BasicTxnInfo(false);
+ txnInfo.setId(rs.getLong(1));
+ txnInfo.setTime(rs.getTimestamp(2, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime());
+ txnInfo.setTxnid(rs.getLong(3));
+ txnInfo.setDbname(rs.getString(4));
+ txnInfo.setTablename(rs.getString(5));
+ return txnInfo;
+ } catch (SQLException ex) {
+ LOG.warn("getLastCompletedTransactionForTable failed due to " + getMessage(ex), ex);
+ throw new MetaException("Unable to retrieve commits information due to " + StringUtils.stringifyException(ex));
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ }
+
+ /**
+ * Gets the information of the first transaction for the given table
+ * after the transaction with the input id was committed (if any).
+ */
+ @Override
+ public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit(
+ String inputDbName, String inputTableName, long incrementalIdentifier)
+ throws MetaException {
+ Connection dbConn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
+ try {
+ dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+ stmt = dbConn.createStatement();
+ stmt.setMaxRows(1);
+ String s = "select ctc_id, ctc_timestamp, ctc_txnid, ctc_database, ctc_table "
+ + "from COMPLETED_TXN_COMPONENTS "
+ + "where ctc_database=" + quoteString(inputDbName) + " and ctc_table=" + quoteString(inputTableName)
+ + " and ctc_id > " + incrementalIdentifier + " order by ctc_id asc";
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Going to execute query <" + s + ">");
+ }
+ rs = stmt.executeQuery(s);
+ if(!rs.next()) {
+ return new BasicTxnInfo(true);
+ }
+ final BasicTxnInfo txnInfo = new BasicTxnInfo(false);
+ txnInfo.setId(rs.getLong(1));
+ txnInfo.setTime(rs.getTimestamp(2, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime());
+ txnInfo.setTxnid(rs.getLong(3));
+ txnInfo.setDbname(rs.getString(4));
+ txnInfo.setTablename(rs.getString(5));
+ return txnInfo;
+ } catch (SQLException ex) {
+ LOG.warn("getLastCompletedTransactionForTable failed due to " + getMessage(ex), ex);
+ throw new MetaException("Unable to retrieve commits information due to " + StringUtils.stringifyException(ex));
+ } finally {
+ close(rs, stmt, dbConn);
+ }
+ }
+
/**
* As much as possible (i.e. in absence of retries) we want both operations to be done on the same
* connection (but separate transactions). This avoid some flakiness in BONECP where if you