You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2021/05/19 01:36:52 UTC
[impala] 02/05: Revert "Revert "IMPALA-10613: Standup HMS thrift
server in Catalog""
This is an automated email from the ASF dual-hosted git repository.
stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 5c85bf5c54fb1e0dffbe01b3f70c289de3700a66
Author: Vihang Karajgaonkar <vi...@apache.org>
AuthorDate: Wed May 12 07:30:48 2021 -0700
Revert "Revert "IMPALA-10613: Standup HMS thrift server in Catalog""
This reverts commit 829d1a6ab4643b07877fb410971b67f1b1d1b045.
Additionally, this patch has couple of addendums which are related
to the original change:
1. Bug fix the original reverted commit which uses
isSetGetFileMetadata instead of isGetFileMetadata
(see https://gerrit.cloudera.org/#/c/17330/)
2. Fix for intermittent failures on CatalogHmsFileMetadataTest
due to the limitation of the catalogd's HMS client requirement
of need to set "hive.metastore.execute.setugi" to false.
Change-Id: Icbe93f3ae4efd585d4b0092a9ac7081b0b2c1c44
Reviewed-on: http://gerrit.cloudera.org:8080/17429
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Reviewed-by: Aman Sinha <am...@cloudera.com>
---
be/src/catalog/catalog-server.cc | 10 +
be/src/common/global-flags.cc | 5 +
be/src/util/backend-gflag-util.cc | 8 +
common/thrift/BackendGflags.thrift | 8 +
common/thrift/CatalogService.thrift | 5 +
.../org/apache/impala/compat/MetastoreShim.java | 2 +-
.../apache/impala/catalog/CatalogHmsAPIHelper.java | 592 +++++
.../impala/catalog/CatalogServiceCatalog.java | 44 +-
.../GetPartialCatalogObjectRequestBuilder.java | 147 ++
.../java/org/apache/impala/catalog/HdfsTable.java | 6 +-
.../impala/catalog/ParallelFileMetadataLoader.java | 19 +-
.../main/java/org/apache/impala/catalog/Table.java | 11 +-
.../catalog/metastore/CatalogHmsClientUtils.java | 159 ++
.../catalog/metastore/CatalogMetastoreServer.java | 289 +++
.../metastore/CatalogMetastoreServiceHandler.java | 152 ++
.../catalog/metastore/ICatalogMetastoreServer.java | 47 +
.../catalog/metastore/MetastoreServiceHandler.java | 2736 ++++++++++++++++++++
.../metastore/NoOpCatalogMetastoreServer.java | 44 +
.../org/apache/impala/service/BackendConfig.java | 22 +
.../metastore/CatalogHmsFileMetadataTest.java | 169 ++
.../metastore/EnableCatalogdHmsCacheFlagTest.java | 130 +
.../impala/testutil/CatalogServiceTestCatalog.java | 89 +-
tests/common/impala_test_suite.py | 27 +
tests/custom_cluster/test_metastore_service.py | 707 +++++
24 files changed, 5406 insertions(+), 22 deletions(-)
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index d354deb..f7e7b35 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -98,6 +98,16 @@ DEFINE_int32(max_wait_time_for_sync_ddl_s, 0, "Maximum time (in seconds) until "
"coordinators might have applied the changes caused due to the ddl.");
DECLARE_string(debug_actions);
+DEFINE_bool(start_hms_server, false, "When set to true catalog server starts a HMS "
+ "server at a port specified by hms_port flag");
+
+DEFINE_int32(hms_port, 5899, "If start_hms_server is set to true, this "
+ "configuration specifies the port number at which it is started.");
+
+DEFINE_bool(fallback_to_hms_on_errors, true, "This configuration is only used if "
+ "start_hms_server is true. This is used to determine if the Catalog should fallback "
+ "to the backing HMS service if there are errors while processing the HMS request");
+
DECLARE_string(state_store_host);
DECLARE_int32(state_store_subscriber_port);
DECLARE_int32(state_store_port);
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 795f2e4..5a4ca3c 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -346,6 +346,11 @@ DEFINE_bool(enable_incremental_metadata_updates, true,
"propagated as a whole object in the statestore topic updates. Note that legacy "
"coordinators can apply incremental or full table updates so don't need this flag.");
+DEFINE_bool(enable_catalogd_hms_cache, true,
+ "If true, response for the HMS APIs that are implemented in catalogd will be served "
+ "from catalogd. If this flag is false or a given API is not implemented in catalogd,"
+ " it will be redirected to HMS.");
+
// ++========================++
// || Startup flag graveyard ||
// ++========================++
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index a218c06..a15b633 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -91,6 +91,10 @@ DECLARE_int64(topic_update_tbl_max_wait_time_ms);
DECLARE_int32(catalog_max_lock_skipped_topic_updates);
DECLARE_string(scratch_dirs);
DECLARE_int32(max_wait_time_for_sync_ddl_s);
+DECLARE_bool(start_hms_server);
+DECLARE_int32(hms_port);
+DECLARE_bool(fallback_to_hms_on_errors);
+DECLARE_bool(enable_catalogd_hms_cache);
// HS2 SAML2.0 configuration
// Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -285,6 +289,10 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
cfg.__set_scratch_dirs(FLAGS_scratch_dirs);
cfg.__set_max_wait_time_for_sync_ddl_s(FLAGS_max_wait_time_for_sync_ddl_s);
cfg.__set_allow_ordinals_in_having(FLAGS_allow_ordinals_in_having);
+ cfg.__set_start_hms_server(FLAGS_start_hms_server);
+ cfg.__set_hms_port(FLAGS_hms_port);
+ cfg.__set_fallback_to_hms_on_errors(FLAGS_fallback_to_hms_on_errors);
+ cfg.__set_enable_catalogd_hms_cache(FLAGS_enable_catalogd_hms_cache);
return Status::OK();
}
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 2d25efe..c483469 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -197,4 +197,12 @@ struct TBackendGflags {
86: required i32 max_wait_time_for_sync_ddl_s
87: required bool allow_ordinals_in_having
+
+ 88: required bool start_hms_server
+
+ 89: required i32 hms_port
+
+ 90: required bool fallback_to_hms_on_errors
+
+ 91: required bool enable_catalogd_hms_cache
}
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 852c0ad..d70e4b6 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -375,6 +375,11 @@ struct TTableInfoSelector {
// with the HMS table which it has and triggers a reload in case it doesn't match.
// this field is only used when valid_write_ids is set, otherwise it is ignored
10: optional i64 table_id = -1
+
+ // The response should contain the column statistics for all columns. If this is set
+ // to true, the list provided in want_stats_for_column_names will be ignored and
+ // stats for all columns will be returned.
+ 11: optional bool want_stats_for_all_columns
}
// Returned information about a particular partition.
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index de7c586..523ca21 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -190,7 +190,7 @@ public class MetastoreShim {
* Constant variable that stores engine value needed to store / access
* Impala column statistics.
*/
- protected static final String IMPALA_ENGINE = "impala";
+ public static final String IMPALA_ENGINE = "impala";
/**
* Wrapper around MetaStoreUtils.validateName() to deal with added arguments.
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
new file mode 100644
index 0000000..ddde537
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
@@ -0,0 +1,592 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
+import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.FileMetadata;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableResult;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.ObjectDictionary;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.impala.analysis.TableName;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.metastore.CatalogMetastoreServiceHandler;
+import org.apache.impala.common.Pair;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.thrift.CatalogLookupStatus;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
+import org.apache.impala.thrift.THdfsFileDesc;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TPartialPartitionInfo;
+import org.apache.impala.util.AcidUtils;
+import org.apache.impala.util.ListMap;
+import org.apache.thrift.TException;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is a Util class which includes helper methods to fetch the information from
+ * Catalog in order to serve the HMS APIs.
+ */
+public class CatalogHmsAPIHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CatalogHmsAPIHelper.class);
+ // we log at info level if the file-metadata load time on the fallback path took more
+ // than 100 msec.
+ //TODO change this to per directory level value based on empirical average value
+ //for various filesystems.
+ private static final long FALLBACK_FILE_MD_TIME_WARN_THRESHOLD_MS = 100;
+
+ //TODO Make this individually configurable
+ private static final ExecutorService fallbackFdLoaderPool = Executors
+ .newFixedThreadPool(20,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("HMS-Filemetadata-loader-%d").build());
+
+ public static final String IMPALA_TNETWORK_ADDRESSES = "impala:TNetworkAddress";
+
+ /**
+ * Helper method to serve a get_table_req() API via the catalog. If isGetColumnStats()
+ * is true in the request, we check that the engine is Impala.
+ *
+ * @param catalog The catalog instance which caches the table.
+ * @param getTableRequest The GetTableRequest object passed by the caller.
+ * @return GetTableResult for the given request.
+ */
+ public static GetTableResult getTableReq(CatalogServiceCatalog catalog,
+ String defaultCatalogName, GetTableRequest getTableRequest)
+ throws CatalogException, NoSuchObjectException, MetaException {
+ checkCatalogName(getTableRequest.getCatName(), defaultCatalogName);
+ checkCondition(getTableRequest.getDbName() != null,
+ "Database name is null");
+ checkCondition(getTableRequest.getTblName() != null,
+ "Table name is null");
+ String dbName = getTableRequest.getDbName();
+ String tblName = getTableRequest.getTblName();
+ TableName tableName = new TableName(dbName, tblName);
+ GetPartialCatalogObjectRequestBuilder reqBuilder =
+ new GetPartialCatalogObjectRequestBuilder()
+ .db(dbName)
+ .tbl(tblName);
+ // Make sure if getColumnStats() is true, the processor engine is set to Impala.
+ if (getTableRequest.isGetColumnStats()) {
+ checkCondition(getTableRequest.getEngine() != null, "Column stats are requested "
+ + "but engine is not set in the request.");
+ checkCondition(
+ MetastoreShim.IMPALA_ENGINE.equalsIgnoreCase(getTableRequest.getEngine()),
+ "Unsupported engine " + getTableRequest.getEngine()
+ + " while requesting column statistics of the table " + dbName + "."
+ + tblName);
+ reqBuilder.wantStatsForAllColums();
+ }
+ // if the client request has getFileMetadata flag set,
+ // we retrieve file descriptors too
+ if (getTableRequest.isGetFileMetadata()) {
+ reqBuilder.wantFiles();
+ }
+ if (getTableRequest.isSetValidWriteIdList()) {
+ reqBuilder.writeId(getTableRequest.getValidWriteIdList());
+ }
+ //TODO add table id in the request when client passes it. Also, looks like
+ // when HIVE-24662 is resolved, we may not need the table id in this request.
+ TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(
+ catalog, reqBuilder.build(), dbName, tblName, "processing get_table_req");
+ // As of writing this code, we know that the table which is returned in the response
+ // is a copy of table stored in the catalogd. Should this assumption change in the
+ // future, we must make sure that we take a deepCopy() of the table before modifying
+ // it later below.
+ Table retTable = response.table_info.hms_table;
+ checkCondition(
+ !AcidUtils.isTransactionalTable(retTable.getParameters()) || getTableRequest
+ .isSetValidWriteIdList(), "Table " + dbName + "." + tblName
+ + " is transactional but it was requested without providing"
+ + " validWriteIdList");
+ if (getTableRequest.isGetColumnStats()) {
+ List<ColumnStatisticsObj> columnStatisticsObjList =
+ response.table_info.column_stats;
+ checkCondition(columnStatisticsObjList != null,
+ "Catalog returned a null ColumnStatisticsObj for table %s", tableName);
+ // Set the table level column statistics for this table.
+ ColumnStatistics columnStatistics = MetastoreShim.createNewHiveColStats();
+ columnStatistics.setStatsDesc(new ColumnStatisticsDesc(true, dbName, tblName));
+ for (ColumnStatisticsObj colsStatsObj : columnStatisticsObjList) {
+ columnStatistics.addToStatsObj(colsStatsObj);
+ }
+ retTable.setColStats(columnStatistics);
+ }
+ if (getTableRequest.isGetFileMetadata()) {
+ // set the file-metadata in the response
+ checkCondition(response.table_info.partitions != null,
+ "File metadata was not returned by catalog");
+ checkCondition(response.table_info.partitions.size() == 1,
+ "Retrieving file-metadata for partitioned tables must use partition level"
+ + " fetch APIs");
+ FileMetadata fileMetadata = new FileMetadata();
+ for (THdfsFileDesc fd : response.table_info.partitions.get(0).file_descriptors) {
+ fileMetadata.addToData(fd.file_desc_data);
+ }
+ retTable.setFileMetadata(fileMetadata);
+ retTable.setDictionary(getSerializedNetworkAddress(
+ response.table_info.network_addresses));
+ }
+ return new GetTableResult(retTable);
+ }
+
+ /**
+ * Util method to issue the {@link CatalogServiceCatalog#getPartialCatalogObject} and
+ * parse the response to do some sanity checks.
+ */
+ private static TGetPartialCatalogObjectResponse getPartialCatalogObjResponse(
+ CatalogServiceCatalog catalog, TGetPartialCatalogObjectRequest request,
+ String dbName, String tblName, String mesg)
+ throws CatalogException, NoSuchObjectException {
+ TGetPartialCatalogObjectResponse response = catalog
+ .getPartialCatalogObject(request, mesg);
+ checkCondition(response != null, "Catalog returned a null response");
+ if (response.lookup_status == CatalogLookupStatus.DB_NOT_FOUND) {
+ throw new NoSuchObjectException("Database " + dbName + " not found");
+ }
+ if (response.lookup_status == CatalogLookupStatus.TABLE_NOT_FOUND) {
+ throw new NoSuchObjectException("Table " + dbName + "." + tblName + " not found");
+ }
+ checkCondition(response.lookup_status != CatalogLookupStatus.TABLE_NOT_LOADED,
+ "Could not load table %s.%s", dbName, tblName);
+ checkCondition(response.table_info.hms_table != null,
+ "Catalog returned a null table for %s.%s", dbName, tblName);
+ return response;
+ }
+
+ /**
+ * Helper method to get the partitions filtered by a Hive expression.
+ *
+ * @param catalog The catalog instance which caches the table.
+ * @param defaultCatalog Currently, custom catalog names are not supported.
+ * If the catalog is not null it must be same as defaultCatalog.
+ * @param request The request object as received from the HMS client.
+ * @param expressionProxy The PartitionExpressionProxy instance which is used to
+ * deserialize the filter expression.
+ * @return PartitionsByExprResult for the given request.
+ * @throws CatalogException If there are any internal processing errors (eg. table does
+ * not exist in Catalog cache) within the context of Catalog.
+ * @throws MetaException If the expression cannot be deserialized using the given
+ * PartitionExpressionProxy.
+ */
+ public static PartitionsByExprResult getPartitionsByExpr(CatalogServiceCatalog catalog,
+ String defaultCatalog, PartitionsByExprRequest request,
+ PartitionExpressionProxy expressionProxy)
+ throws CatalogException, NoSuchObjectException, MetaException {
+ checkCatalogName(request.getCatName(), defaultCatalog);
+ checkCondition(request.getDbName() != null, "Database name is null");
+ checkCondition(request.getTblName() != null, "Table name is null");
+ String dbName = request.getDbName();
+ String tblName = request.getTblName();
+ TableName tableName = new TableName(dbName, tblName);
+ // TODO we are currently fetching all the partitions metadata here since catalog
+ // loads the entire table. Once we have fine-grained loading, we should just get
+ // the partitionNames here and then get the partition metadata after the pruning
+ // is done.
+ GetPartialCatalogObjectRequestBuilder catalogReq =
+ new GetPartialCatalogObjectRequestBuilder()
+ .db(dbName)
+ .tbl(tblName)
+ .wantPartitions();
+ // set the validWriteIdList if available
+ if (request.isSetValidWriteIdList()) {
+ catalogReq.writeId(request.getValidWriteIdList());
+ }
+ // set the table id if available
+ if (request.isSetId()) {
+ catalogReq.tableId(request.getId());
+ }
+ TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(catalog,
+ catalogReq.build(), dbName, tblName,
+ "processing " + CatalogMetastoreServiceHandler.GET_PARTITION_BY_EXPR);
+ checkCondition(response.table_info.hms_table.getPartitionKeys() != null,
+ "%s is not a partitioned table", tableName);
+ // create a mapping of the Partition name to the Partition so that we can return the
+ // filtered partitions later.
+ Map<String, TPartialPartitionInfo> partitionNameToPartInfo = new HashMap<>();
+ for (TPartialPartitionInfo partInfo : response.getTable_info().getPartitions()) {
+ partitionNameToPartInfo.put(partInfo.getName(), partInfo);
+ }
+ List<String> filteredPartNames = Lists.newArrayList(partitionNameToPartInfo.keySet());
+ Stopwatch st = Stopwatch.createStarted();
+ boolean hasUnknownPartitions = expressionProxy
+ .filterPartitionsByExpr(response.table_info.hms_table.getPartitionKeys(),
+ request.getExpr(), request.getDefaultPartitionName(), filteredPartNames);
+ LOG.info("{}/{} partitions were selected for table {} after expression evaluation."
+ + " Time taken: {} msec.", filteredPartNames.size(),
+ partitionNameToPartInfo.size(), tableName,
+ st.stop().elapsed(TimeUnit.MILLISECONDS));
+ List<Partition> filteredPartitions = Lists
+ .newArrayListWithCapacity(filteredPartNames.size());
+ // TODO add file-metadata to Partitions. This would requires changes to HMS API
+ // so that request can pass a flag to send back filemetadata
+ for (String partName : filteredPartNames) {
+ // Note that we are not using String.format arguments here since String.format()
+ // throws a java.util.MissingFormatArgumentException for special characters like
+ // '%3A' which could be present in the PartitionName.
+ checkCondition(partitionNameToPartInfo.containsKey(partName),
+ "Could not find partition id for partition name " + partName);
+ TPartialPartitionInfo partInfo = partitionNameToPartInfo.get(partName);
+ checkCondition(partInfo != null,
+ "Catalog did not return the partition " + partName + " for table " + tableName,
+ partName, tableName);
+ filteredPartitions.add(partitionNameToPartInfo.get(partName).getHms_partition());
+ }
+ // confirm if the number of partitions is equal to number of filtered ids
+ checkCondition(filteredPartNames.size() == filteredPartitions.size(),
+ "Unexpected number of partitions received. Expected %s got %s",
+ filteredPartNames.size(), filteredPartitions.size());
+ PartitionsByExprResult result = new PartitionsByExprResult();
+ result.setPartitions(filteredPartitions);
+ result.setHasUnknownPartitions(hasUnknownPartitions);
+ return result;
+ }
+
+ /**
+ * Throws a {@code CatalogException} with the given message string and arguments if the
+ * condition is False.
+ *
+ * @param condition throws CatalogException when the condition is False.
+ * @param msg msg compatible with {@code String.format()} format.
+ * @param args args for the {@code String.format()} to used as message in the
+ * thrown exception.
+ * @throws CatalogException if condition is False.
+ */
+ private static void checkCondition(boolean condition, String msg, Object... args)
+ throws CatalogException {
+ if (condition) return;
+ throw new CatalogException(String.format(msg, args));
+ }
+
+ /**
+ * Catalog service does not support {@link org.apache.hadoop.hive.metastore.api.Catalog}
+ * currently. This method validates the name of the given catalog with default Catalog
+ * and throws a {@link MetaException} if it doesn't match.
+ */
+ public static void checkCatalogName(String catalogName, String defaultCatalogName)
+ throws MetaException {
+ if (catalogName == null) return;
+ try {
+ checkCondition(defaultCatalogName != null, "Default catalog name is null");
+ checkCondition(defaultCatalogName.equalsIgnoreCase(catalogName),
+ "Catalog service does not support non-default catalogs. Expected %s got %s",
+ defaultCatalogName, catalogName);
+ } catch (CatalogException ex) {
+ LOG.error(ex.getMessage(), ex);
+ throw new MetaException(ex.getMessage());
+ }
+ }
+
+ /**
+ * Helper method to return a list of Partitions filtered by names. Currently partition
+ * level column statistics cannot be requested and this method throws an exception if
+ * the request has get_col_stats parameter set. If the request has {@code
+ * getFileMetadata} set the returned response will include the file-metadata as well.
+ */
+ public static GetPartitionsByNamesResult getPartitionsByNames(
+ CatalogServiceCatalog catalog, Configuration serverConf,
+ GetPartitionsByNamesRequest request)
+ throws CatalogException, NoSuchObjectException, MetaException {
+ // in case of GetPartitionsByNamesRequest there is no explicit catalogName
+ // field and hence the dbName is prepended to the database name.
+ // TODO this needs to be fixed in HMS APIs so that we have explicit catalog field.
+ String catAnddbName = request.getDb_name();
+ String tblName = request.getTbl_name();
+ checkCondition(!Strings.isNullOrEmpty(catAnddbName),
+ "Database name is empty or null");
+ checkCondition(!Strings.isNullOrEmpty(tblName), "Table name is empty or null");
+ String[] parsedCatDbName = MetaStoreUtils
+ .parseDbName(request.getDb_name(), serverConf);
+ checkCondition(parsedCatDbName.length == 2,
+ "Unexpected error during parsing the catalog and database name %s", catAnddbName);
+ checkCatalogName(parsedCatDbName[0], MetaStoreUtils.getDefaultCatalog(serverConf));
+ String dbName = parsedCatDbName[1];
+ //TODO partition granularity column statistics are not supported in catalogd currently
+ checkCondition(!request.isGet_col_stats(),
+ "Partition level column statistics are not supported in catalog");
+ GetPartialCatalogObjectRequestBuilder requestBuilder =
+ new GetPartialCatalogObjectRequestBuilder()
+ .db(dbName)
+ .tbl(tblName)
+ .wantPartitions();
+ // get the file metadata if the request has it set.
+ if (request.isGetFileMetadata()) {
+ requestBuilder.wantFiles();
+ }
+ if (request.isSetValidWriteIdList()) {
+ requestBuilder.writeId(request.getValidWriteIdList());
+ }
+ //TODO add table id in the request when client passes it. Also, looks like
+ // when HIVE-24662 is resolved, we may not need the table id in this request.
+ TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(catalog,
+ requestBuilder.build(), dbName, tblName,
+ "processing " + CatalogMetastoreServiceHandler.GET_PARTITION_BY_NAMES);
+ checkCondition(response.table_info.hms_table.getPartitionKeys() != null,
+ "%s.%s is not a partitioned table", dbName, tblName);
+ checkCondition(
+ !request.isGetFileMetadata() || response.table_info.network_addresses != null,
+ "Network addresses were not returned for %s.%s", dbName, tblName);
+ checkCondition(
+ !AcidUtils.isTransactionalTable(response.table_info.hms_table.getParameters())
+ || request.isSetValidWriteIdList(), "Table " + dbName + "." + tblName
+ + " is a transactional table but partitions were requested without "
+ + "providing validWriteIdList");
+ // create a mapping of the Partition name to the Partition so that we can return the
+ // filtered partitions later.
+ Set<String> requestedNames = new HashSet<>(request.getNames());
+ List<Partition> retPartitions = new ArrayList<>();
+ // filter by names
+ for (TPartialPartitionInfo partInfo : response.getTable_info().getPartitions()) {
+ if (requestedNames.contains(partInfo.getName())) {
+ // as of writing this code, we know that the partInfo object has a copy of the
+ // the partition which is stored in the catalogd instead of the reference to the
+ // actual partition object of the catalogd. Hence modifying it below is okay.
+ // Should this assumption change in the future, we must make a deepCopy of the
+ // partition object here to make sure that we don't modify the state of the
+ // partition in the catalogd.
+ Partition part = partInfo.getHms_partition();
+ if (request.isGetFileMetadata()) {
+ FileMetadata fileMetadata = new FileMetadata();
+ checkCondition(partInfo.file_descriptors != null,
+ "Catalog did not return file descriptors for partition %s of table %s.%s",
+ partInfo.getName(), dbName, tblName);
+ for (THdfsFileDesc fd : partInfo.file_descriptors) {
+ fileMetadata.addToData(fd.file_desc_data);
+ }
+ part.setFileMetadata(fileMetadata);
+ }
+ retPartitions.add(part);
+ }
+ }
+ GetPartitionsByNamesResult result = new GetPartitionsByNamesResult(retPartitions);
+ if (request.isGetFileMetadata()) {
+ result.setDictionary(getSerializedNetworkAddress(
+ response.table_info.network_addresses));
+ }
+ return result;
+ }
+
+ /**
+ * Util method to serialize a given list of {@link TNetworkAddress} into a {@link
+ * ObjectDictionary}.
+ */
+ public static ObjectDictionary getSerializedNetworkAddress(
+ List<TNetworkAddress> networkAddresses) throws CatalogException {
+ checkCondition(networkAddresses != null, "Network addresses is null");
+ // we assume that this method is only called when we want to return the file-metadata
+ // in the response in which case there should always a valid ObjectDictionary to be
+ // returned.
+ ObjectDictionary result = new ObjectDictionary(Maps.newHashMap());
+ if (networkAddresses.isEmpty()) return result;
+ List<ByteBuffer> serializedAddresses = new ArrayList<>();
+ TSerializer serializer =
+ new TSerializer(new TCompactProtocol.Factory());
+ for (TNetworkAddress networkAddress : networkAddresses) {
+ byte[] serializedNetAddress;
+ try {
+ serializedNetAddress = serializer.serialize(networkAddress);
+ } catch (TException tException) {
+ throw new CatalogException(
+ "Could not serialize network address " + networkAddress.hostname + ":"
+ + networkAddress.port, tException);
+ }
+ serializedAddresses.add(ByteBuffer.wrap(serializedNetAddress));
+ }
+ result.putToValues(IMPALA_TNETWORK_ADDRESSES, serializedAddresses);
+ return result;
+ }
+
+ /**
+ * This method computes the file-metadata directly from filesystem and sets it in the
+ * Table object of the provided GetTableResult. The file-metadata is loaded using a
+ * common static thread pool and is consistent with the given ValidTxnList and
+ * ValidWriteIdList.
+ *
+ * @throws MetaException in case there were errors while loading file-metadata.
+ */
+ public static void loadAndSetFileMetadataFromFs(@Nullable ValidTxnList validTxnList,
+ @Nullable ValidWriteIdList writeIdList, GetTableResult result)
+ throws MetaException {
+ try {
+ Stopwatch sw = Stopwatch.createStarted();
+ Table tbl = result.getTable();
+ checkCondition(tbl != null, "Table is null");
+ checkCondition(tbl.getSd() != null && tbl.getSd().getLocation() != null,
+ "Cannot get the location of table %s.%s", tbl.getDbName(), tbl.getTableName());
+ Path tblPath = new Path(tbl.getSd().getLocation());
+ // since this table doesn't exist in catalogd we compute the network addresses
+ // for the files which are being returned.
+ ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+ FileMetadataLoader fmLoader = new FileMetadataLoader(tblPath, true,
+ Collections.EMPTY_LIST, hostIndex, validTxnList, writeIdList);
+ boolean success = getFileMetadata(Arrays.asList(fmLoader));
+ checkCondition(success,
+ "Could not load file-metadata for table %s.%s. See catalogd log for details",
+ tbl.getDbName(), tbl.getTableName());
+ FileMetadata fileMetadata = new FileMetadata();
+ for (FileDescriptor fd : fmLoader.getLoadedFds()) {
+ fileMetadata.addToData(fd.toThrift().file_desc_data);
+ }
+ tbl.setFileMetadata(fileMetadata);
+ tbl.setDictionary(getSerializedNetworkAddress(hostIndex.getList()));
+ long timeTaken = sw.stop().elapsed(TimeUnit.MILLISECONDS);
+ if (timeTaken > FALLBACK_FILE_MD_TIME_WARN_THRESHOLD_MS) {
+ LOG.warn("Loading the filemetadata for table {}.{} on the fallback path. Time "
+ + "taken: {} msec", tbl.getDbName(), tbl.getTableName(), timeTaken);
+ } else {
+ LOG.debug("Loading the filemetadata for table {}.{} on the fallback path. Time "
+ + "taken: {} msec", tbl.getDbName(), tbl.getTableName(), timeTaken);
+ }
+ } catch (Exception ex) {
+ LOG.error("Unexpected error when loading filemetadata", ex);
+ throw new MetaException(
+ "Could not load filemetadata. Cause " + ex.getMessage());
+ }
+ }
+
+ /**
+ * Sets the file metadata for given partitions from the file-system. In case of
+ * transactional tables it uses the given ValidTxnList and ValidWriteIdList to compute
+ * the snapshot of the file-metadata.
+ */
+ public static void loadAndSetFileMetadataFromFs(@Nullable ValidTxnList txnList,
+ @Nullable ValidWriteIdList writeIdList,
+ GetPartitionsByNamesResult getPartsResult) throws MetaException {
+ if (getPartsResult.getPartitionsSize() == 0) return;
+ final String dbName = getPartsResult.getPartitions().get(0).getDbName();
+ final String tblName = getPartsResult.getPartitions().get(0).getTableName();
+ try {
+ Stopwatch sw = Stopwatch.createStarted();
+ Map<Partition, FileMetadataLoader> fileMdLoaders = new HashMap<>();
+ ListMap<TNetworkAddress> hostIndex = new ListMap<>();
+ for (Partition part : getPartsResult.getPartitions()) {
+ checkCondition(part.getSd() != null && part.getSd().getLocation() != null,
+ "Could not get the location for partition %s of table %s.%s",
+ part.getValues(), part.getDbName(), part.getTableName());
+ Path partPath = new Path(part.getSd().getLocation());
+ fileMdLoaders.put(part,
+ new FileMetadataLoader(partPath, true, Collections.EMPTY_LIST, hostIndex,
+ txnList, writeIdList));
+ }
+ boolean success = getFileMetadata(fileMdLoaders.values());
+ checkCondition(success,
+ "Could not load file-metadata for %s partitions of table %s.%s. See "
+ + "catalogd log for details", getPartsResult.getPartitionsSize(), dbName,
+ tblName);
+ for (Entry<Partition, FileMetadataLoader> entry : fileMdLoaders.entrySet()) {
+ FileMetadata filemetadata = new FileMetadata();
+ for (FileDescriptor fd : entry.getValue().getLoadedFds()) {
+ filemetadata.addToData(fd.toThrift().file_desc_data);
+ }
+ entry.getKey().setFileMetadata(filemetadata);
+ }
+ getPartsResult.setDictionary(getSerializedNetworkAddress(hostIndex.getList()));
+ long timeTaken = sw.stop().elapsed(TimeUnit.MILLISECONDS);
+ if (timeTaken > FALLBACK_FILE_MD_TIME_WARN_THRESHOLD_MS) {
+ LOG.info("Loading the file metadata for {} partitions of table {}.{} on the "
+ + "fallback path. Time taken: {} msec",
+ getPartsResult.getPartitionsSize(), dbName, tblName, timeTaken);
+ } else {
+ LOG.debug("Loading the file metadata for {} partitions of table {}.{} on the "
+ + "fallback path. Time taken: {} msec",
+ getPartsResult.getPartitionsSize(), dbName, tblName, timeTaken);
+ }
+ } catch (CatalogException ex) {
+ LOG.error(
+ "Unexpected error when loading file-metadata for partitions of table {}.{}",
+ dbName, tblName, ex);
+ throw new MetaException("Could not load file metadata. Cause " + ex.getMessage());
+ }
+ }
+
+ /**
+ * Loads the file-metadata in parallel using the common thread pool {@code
+ * fallbackFdLoaderPool}
+ *
+ * @param loaders The FileMetadataLoader objects corresponding for each path.
+ * @return false if there were errors, else returns true.
+ */
+ private static boolean getFileMetadata(Collection<FileMetadataLoader> loaders) {
+ List<Pair<Path, Future<Void>>> futures = new ArrayList<>(loaders.size());
+ for (FileMetadataLoader fmdLoader : loaders) {
+ futures.add(new Pair<>(fmdLoader.getPartDir(), fallbackFdLoaderPool.submit(() -> {
+ fmdLoader.load();
+ return null;
+ })));
+ }
+ int numberOfErrorsToLog = 100;
+ int errors = 0;
+ for (Pair<Path, Future<Void>> pair : futures) {
+ try {
+ pair.second.get();
+ } catch (InterruptedException | ExecutionException e) {
+ errors++;
+ if (errors < numberOfErrorsToLog) {
+ LOG.error("Could not load file-metadata for path {}", pair.first, e);
+ }
+ }
+ }
+ if (errors > 0 && (numberOfErrorsToLog - errors) > 0) {
+ LOG.error("{} loading errors were not logged. Only logged the first {} errors",
+ numberOfErrorsToLog - errors, numberOfErrorsToLog);
+ }
+ return errors == 0;
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 3703b43..67bdb26 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -54,6 +54,7 @@ import org.apache.impala.analysis.TableName;
import org.apache.impala.authorization.AuthorizationDelta;
import org.apache.impala.authorization.AuthorizationManager;
import org.apache.impala.authorization.AuthorizationPolicy;
+import org.apache.impala.catalog.FeFsTable.Utils;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.catalog.events.ExternalEventsProcessor;
@@ -63,6 +64,9 @@ import org.apache.impala.catalog.events.NoOpEventProcessor;
import org.apache.impala.catalog.events.SelfEventContext;
import org.apache.impala.catalog.monitor.CatalogMonitor;
import org.apache.impala.catalog.monitor.CatalogTableMetrics;
+import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
+import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
+import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.common.Pair;
@@ -273,6 +277,8 @@ public class CatalogServiceCatalog extends Catalog {
// Manages the event processing from metastore for issuing invalidates on tables
private ExternalEventsProcessor metastoreEventProcessor_;
+ private final ICatalogMetastoreServer catalogMetastoreServer_;
+
/**
* See the gflag definition in be/.../catalog-server.cc for details on these modes.
*/
@@ -349,6 +355,8 @@ public class CatalogServiceCatalog extends Catalog {
Preconditions.checkState(PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S > 0);
// start polling for metastore events
metastoreEventProcessor_.start();
+ catalogMetastoreServer_ = getCatalogMetastoreServer();
+ catalogMetastoreServer_.start();
}
/**
@@ -365,6 +373,20 @@ public class CatalogServiceCatalog extends Catalog {
}
/**
+ * Returns an instance of CatalogMetastoreServer if start_hms_server configuration is
+ * true. Otherwise, returns a NoOpCatalogMetastoreServer
+ */
+ @VisibleForTesting
+ protected ICatalogMetastoreServer getCatalogMetastoreServer() {
+ if (!BackendConfig.INSTANCE.startHmsServer()) {
+ return NoOpCatalogMetastoreServer.INSTANCE;
+ }
+ int portNumber = BackendConfig.INSTANCE.getHMSPort();
+ Preconditions.checkState(portNumber > 0, "Invalid port number for HMS service.");
+ return new CatalogMetastoreServer(this);
+ }
+
+ /**
* Check whether the database is in blacklist
*/
public boolean isBlacklistedDb(String dbName) {
@@ -3325,6 +3347,16 @@ public class CatalogServiceCatalog extends Catalog {
*/
public TGetPartialCatalogObjectResponse getPartialCatalogObject(
TGetPartialCatalogObjectRequest req) throws CatalogException {
+ return getPartialCatalogObject(req, "needed by coordinator");
+ }
+
+ /**
+ * A wrapper around doGetPartialCatalogObject() that controls the number of concurrent
+ * invocations.
+ */
+ public TGetPartialCatalogObjectResponse getPartialCatalogObject(
+ TGetPartialCatalogObjectRequest req, String reason) throws CatalogException {
+ Preconditions.checkNotNull(reason);
try {
if (!partialObjectFetchAccess_.tryAcquire(1,
PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S, TimeUnit.SECONDS)) {
@@ -3348,7 +3380,7 @@ public class CatalogServiceCatalog extends Catalog {
try (ThreadNameAnnotator tna = new ThreadNameAnnotator(
"Get Partial Catalog Object - " +
Catalog.toCatalogObjectKey(req.object_desc))) {
- return doGetPartialCatalogObject(req);
+ return doGetPartialCatalogObject(req, reason);
} finally {
partialObjectFetchAccess_.release();
}
@@ -3387,7 +3419,8 @@ public class CatalogServiceCatalog extends Catalog {
* response's lookup_status.
*/
private TGetPartialCatalogObjectResponse doGetPartialCatalogObject(
- TGetPartialCatalogObjectRequest req) throws CatalogException {
+ TGetPartialCatalogObjectRequest req, String tableLoadReason)
+ throws CatalogException {
TCatalogObject objectDesc = Preconditions.checkNotNull(req.object_desc,
"missing object_desc");
switch (objectDesc.type) {
@@ -3423,7 +3456,7 @@ public class CatalogServiceCatalog extends Catalog {
}
table = getOrLoadTable(
objectDesc.getTable().getDb_name(), objectDesc.getTable().getTbl_name(),
- "needed by coordinator", writeIdList, tableId);
+ tableLoadReason, writeIdList, tableId);
} catch (DatabaseNotFoundException e) {
return createGetPartialCatalogObjectError(CatalogLookupStatus.DB_NOT_FOUND);
}
@@ -3511,7 +3544,10 @@ public class CatalogServiceCatalog extends Catalog {
.map(HdfsPartition.Builder::new)
.collect(Collectors.toList());
new ParallelFileMetadataLoader(
- table, partBuilders, reqWriteIdList, validTxnList, null, logPrefix).load();
+ table.getFileSystem(), partBuilders, reqWriteIdList,
+ validTxnList, Utils.shouldRecursivelyListPartitions(table),
+ table.getHostIndex(), null, logPrefix)
+ .load();
for (HdfsPartition.Builder builder : partBuilders) {
// Let's retrieve the original partition instance from builder because this is
// stored in the keys of 'partToPartialInfoMap'.
diff --git a/fe/src/main/java/org/apache/impala/catalog/GetPartialCatalogObjectRequestBuilder.java b/fe/src/main/java/org/apache/impala/catalog/GetPartialCatalogObjectRequestBuilder.java
new file mode 100644
index 0000000..c012eed
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/GetPartialCatalogObjectRequestBuilder.java
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.thrift.TCatalogObject;
+import org.apache.impala.thrift.TCatalogObjectType;
+import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
+import org.apache.impala.thrift.TTable;
+import org.apache.impala.thrift.TTableInfoSelector;
+
+/**
+ * Simple Request builder class. Assumes all the metadata at higher granularity is
+ * required if a specific level is requested. For example, if files are requested,
+ * assumes that partitions names and partitions are also requested.
+ */
+public class GetPartialCatalogObjectRequestBuilder {
+ private boolean wantFileMetadata_;
+ private boolean wantPartitionMeta_;
+ private boolean wantPartitionNames_;
+ private String tblName_, dbName_;
+ private boolean wantHmsTable_;
+ private boolean wantStatsForAllColumns_;
+ private long tableId = CatalogServiceCatalog.TABLE_ID_UNAVAILABLE;
+ private ValidWriteIdList writeIdList_;
+
+ /**
+ * Sets the database name for the request object.
+ */
+ public GetPartialCatalogObjectRequestBuilder db(String db) {
+ this.dbName_ = db;
+ return this;
+ }
+
+ /**
+ * Sets the table name for the request object.
+ */
+ public GetPartialCatalogObjectRequestBuilder tbl(String tbl) {
+ this.tblName_ = tbl;
+ return this;
+ }
+
+ /**
+ * Sets the request fields required for fetching partition names,
+ * HMS Partition object and the file-metadata of the partitions.
+ */
+ public GetPartialCatalogObjectRequestBuilder wantFiles() {
+ wantFileMetadata_ = true;
+ wantPartitionMeta_ = true;
+ wantPartitionNames_ = true;
+ return this;
+ }
+
+ /**
+ * Sets the request fields required for fetching partition names,
+ * HMS Partition objects. No file-metadata will be fetched.
+ */
+ public GetPartialCatalogObjectRequestBuilder wantPartitions() {
+ wantPartitionNames_ = true;
+ wantPartitionMeta_ = true;
+ return this;
+ }
+
+ /**
+ * Sets the request fields required for fetching partition names. No
+ * partition metadata or file-metadata will be fetched.
+ */
+ public GetPartialCatalogObjectRequestBuilder wantPartitionNames() {
+ wantPartitionNames_ = true;
+ return this;
+ }
+
+ /**
+ * Sets the request fields for fetching column statistics for all columns.
+ */
+ public GetPartialCatalogObjectRequestBuilder wantStatsForAllColums() {
+ wantStatsForAllColumns_ = true;
+ return this;
+ }
+
+ GetPartialCatalogObjectRequestBuilder tableId(long id) {
+ this.tableId = id;
+ return this;
+ }
+
+ GetPartialCatalogObjectRequestBuilder writeId(String writeIdList) {
+ Preconditions.checkNotNull(writeIdList);
+ writeIdList_ = new ValidReaderWriteIdList();
+ writeIdList_.readFromString(writeIdList);
+ return this;
+ }
+
+ GetPartialCatalogObjectRequestBuilder writeId(
+ ValidWriteIdList validWriteIdList) {
+ writeIdList_ = Preconditions.checkNotNull(validWriteIdList);
+ return this;
+ }
+
+ /**
+ * Builds the {@link TGetPartialCatalogObjectRequest} object based on the fields
+ * set in this Builder.
+ */
+ public TGetPartialCatalogObjectRequest build() {
+ TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+ req.object_desc = new TCatalogObject();
+ req.object_desc.setType(TCatalogObjectType.TABLE);
+ req.object_desc.table = new TTable(dbName_, tblName_);
+ req.table_info_selector = new TTableInfoSelector();
+ req.table_info_selector.want_hms_table = true;
+ req.table_info_selector.table_id = tableId;
+ if (writeIdList_ != null) {
+ req.table_info_selector.valid_write_ids = MetastoreShim
+ .convertToTValidWriteIdList(writeIdList_);
+ }
+ if (wantPartitionNames_) {
+ req.table_info_selector.want_partition_names = true;
+ }
+ if (wantPartitionMeta_) {
+ req.table_info_selector.want_partition_metadata = true;
+ }
+ if (wantFileMetadata_) {
+ req.table_info_selector.want_partition_files = true;
+ }
+ if (wantStatsForAllColumns_) {
+ req.table_info_selector.want_stats_for_all_columns = true;
+ }
+ return req;
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index f0ecdee..0539387 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -742,9 +742,9 @@ public class HdfsTable extends Table implements FeFsTable {
// we'll throw an exception here and end up bailing out of whatever catalog operation
// we're in the middle of. This could cause a partial metadata update -- eg we may
// have refreshed the top-level table properties without refreshing the files.
- ParallelFileMetadataLoader loader = new ParallelFileMetadataLoader(
- this, partBuilders, validWriteIds_, validTxnList, debugActions, logPrefix);
- loader.load();
+ new ParallelFileMetadataLoader(getFileSystem(), partBuilders, validWriteIds_,
+ validTxnList, Utils.shouldRecursivelyListPartitions(this),
+ getHostIndex(), debugActions, logPrefix).load();
// TODO(todd): would be good to log a summary of the loading process:
// - how many block locations did we reuse/load individually/load via batch
diff --git a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
index 3f8fdcc..098fdc8 100644
--- a/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/ParallelFileMetadataLoader.java
@@ -20,6 +20,7 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@@ -27,6 +28,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import javax.annotation.Nullable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -36,6 +38,8 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Pair;
import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
import org.apache.impala.util.ThreadNameAnnotator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,10 +73,11 @@ public class ParallelFileMetadataLoader {
private final Map<Path, List<HdfsPartition.Builder>> partsByPath_;
private final FileSystem fs_;
- public ParallelFileMetadataLoader(HdfsTable table,
+ public ParallelFileMetadataLoader(FileSystem fs,
Collection<HdfsPartition.Builder> partBuilders,
- ValidWriteIdList writeIdList, ValidTxnList validTxnList, String debugAction,
- String logPrefix) throws CatalogException {
+ ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean isRecursive,
+ @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String logPrefix)
+ throws CatalogException {
if (writeIdList != null || validTxnList != null) {
// make sure that both either both writeIdList and validTxnList are set or both
// of them are not.
@@ -91,8 +96,8 @@ public class ParallelFileMetadataLoader {
for (Map.Entry<Path, List<HdfsPartition.Builder>> e : partsByPath_.entrySet()) {
List<FileDescriptor> oldFds = e.getValue().get(0).getFileDescriptors();
FileMetadataLoader loader = new FileMetadataLoader(e.getKey(),
- Utils.shouldRecursivelyListPartitions(table), oldFds, table.getHostIndex(),
- validTxnList, writeIdList, e.getValue().get(0).getFileFormat());
+ isRecursive, oldFds, hostIndex, validTxnList, writeIdList,
+ e.getValue().get(0).getFileFormat());
// If there is a cached partition mapped to this path, we recompute the block
// locations even if the underlying files have not changed.
// This is done to keep the cached block metadata up to date.
@@ -103,7 +108,7 @@ public class ParallelFileMetadataLoader {
loaders_.put(e.getKey(), loader);
}
this.logPrefix_ = logPrefix;
- this.fs_ = table.getFileSystem();
+ this.fs_ = fs;
}
/**
@@ -153,7 +158,7 @@ public class ParallelFileMetadataLoader {
List<Pair<FileMetadataLoader, Future<Void>>> futures =
new ArrayList<>(loaders_.size());
for (FileMetadataLoader loader : loaders_.values()) {
- futures.add(new Pair<FileMetadataLoader, Future<Void>>(
+ futures.add(new Pair<>(
loader, pool.submit(() -> { loader.load(); return null; })));
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 910bd49..3d8fa13 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -660,10 +660,13 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
// is done while we continue to hold the table lock.
resp.table_info.setHms_table(getMetaStoreTable().deepCopy());
}
- if (selector.want_stats_for_column_names != null) {
- List<ColumnStatisticsObj> statsList = Lists.newArrayListWithCapacity(
- selector.want_stats_for_column_names.size());
- for (String colName: selector.want_stats_for_column_names) {
+ if (selector.want_stats_for_column_names != null ||
+ selector.want_stats_for_all_columns) {
+ List<String> colList = selector.want_stats_for_all_columns ? getColumnNames() :
+ selector.want_stats_for_column_names;
+ List<ColumnStatisticsObj> statsList =
+ Lists.newArrayListWithCapacity(colList.size());
+ for (String colName: colList) {
Column col = getColumn(colName);
if (col == null) continue;
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java
new file mode 100644
index 0000000..08e30ce
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogHmsClientUtils.java
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import static org.apache.impala.catalog.CatalogHmsAPIHelper.IMPALA_TNETWORK_ADDRESSES;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.hive.metastore.api.FileMetadata;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.ObjectDictionary;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogHmsAPIHelper;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.thrift.THdfsFileDesc;
+import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.util.ListMap;
+import org.apache.thrift.TDeserializer;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol.Factory;
+
+public class CatalogHmsClientUtils {
+
+ /**
+ * Util method useful to deserialize the
+ * {@link FileMetadata} in the {@link GetPartitionsByNamesResult} into a list of
+ * {@link FileDescriptor} objects. The returned Map contains a mapping of partitions
+ * to its List of FileDescriptors.
+ * @param getPartitionsResult The getPartitionsByNamesResult as returned by the HMS API
+ * {@code getPartitionsByNames}. If this param does not contain fileMetadata the method
+ * throws an exception.
+ * @param hostIndex The HostIndex to be used to clone the FileDescriptors before
+ * returning. This is useful to map the FileDescriptors to an existing
+ * HostIndex which is available for the table.
+ * @return Map of Partition to its FileDescriptors. Note that list can be empty in
+ * case there are no files in a partition.
+ * @throws CatalogException If there are any deserialization errors.
+ */
+ public static Map<Partition, List<FileDescriptor>> extractFileDescriptors(
+ GetPartitionsByNamesResult getPartitionsResult, ListMap<TNetworkAddress> hostIndex)
+ throws CatalogException {
+ // if there are no partitions in the result, return early
+ if (getPartitionsResult.getPartitionsSize() == 0) return new HashMap<>(0);
+ Preconditions
+ .checkArgument(getPartitionsResult.isSetDictionary(), "Host info is unavailable");
+ List<TNetworkAddress> hostInfo = deserializeNetworkAddresses(
+ getPartitionsResult.getDictionary());
+ Map<Partition, List<FileDescriptor>> fds = Maps
+ .newHashMapWithExpectedSize(getPartitionsResult.getPartitionsSize());
+ for (Partition part : getPartitionsResult.getPartitions()) {
+ Preconditions.checkArgument(part.isSetFileMetadata(),
+ "Filemetadata is not set for partition with values " + part.getValues());
+ FileMetadata fileMetadata = part.getFileMetadata();
+ List<FileDescriptor> partitionFds = Lists
+ .newArrayListWithCapacity(fileMetadata.getDataSize());
+ if (fileMetadata.getData() != null) {
+ for (ByteBuffer data : fileMetadata.getData()) {
+ FileDescriptor fd = FileDescriptor.fromThrift(new THdfsFileDesc(data));
+ fd.cloneWithNewHostIndex(hostInfo, hostIndex);
+ partitionFds.add(fd);
+ }
+ }
+ fds.put(part, partitionFds);
+ }
+ return fds;
+ }
+
+ /**
+ * Util method to extract the FileDescriptors from the Table returned
+ * from HMS. The method expects that the result has FileMetadata set in the table
+ * along with the Host network addresses serialized in the table dictionary.
+ * @param tbl The table returned from HMS API getTableReq
+ * @param hostIndex The hostIndex of the table which is used to clone the returned
+ * FileDescriptors.
+ * @return List of FileDescriptors for the table.
+ * @throws CatalogException
+ */
+ public static List<FileDescriptor> extractFileDescriptors(Table tbl,
+ ListMap<TNetworkAddress> hostIndex) throws CatalogException {
+ String fullTblName =
+ tbl.getDbName() + "." + tbl.getTableName();
+ Preconditions.checkArgument(tbl.isSetDictionary(),
+ "Host info is not available in the table " + fullTblName);
+ List<TNetworkAddress> hostInfo = deserializeNetworkAddresses(
+ tbl.getDictionary());
+ Preconditions.checkArgument(tbl.isSetFileMetadata(),
+ "Filemetadata is not set for table " + fullTblName);
+ FileMetadata fileMetadata = tbl.getFileMetadata();
+ // it is possible that there are no files in the table.
+ if (fileMetadata.getData() == null) return Collections.emptyList();
+ List<FileDescriptor> tableFds = Lists
+ .newArrayListWithCapacity(fileMetadata.getDataSize());
+ for (ByteBuffer data : fileMetadata.getData()) {
+ FileDescriptor fd = FileDescriptor.fromThrift(new THdfsFileDesc(data));
+ fd.cloneWithNewHostIndex(hostInfo, hostIndex);
+ tableFds.add(fd);
+ }
+ return tableFds;
+ }
+
+ /**
+ * Util method to deserialize a given {@link ObjectDictionary} object into a list of
+ * {@link TNetworkAddress}. This is used to deserialize the output of
+ * {@link CatalogHmsAPIHelper#getSerializedNetworkAddress(List)}.
+ * @param dictionary
+ * @return list of deserialized TNetworkAddress
+ * @throws CatalogException in case of deserialization errors.
+ */
+ private static List<TNetworkAddress> deserializeNetworkAddresses(
+ ObjectDictionary dictionary) throws CatalogException {
+ if (dictionary == null) return null;
+ if (dictionary.getValuesSize() == 0) return Collections.EMPTY_LIST;
+ if (!dictionary.getValues()
+ .containsKey(IMPALA_TNETWORK_ADDRESSES)) {
+ throw new CatalogException("Key " + IMPALA_TNETWORK_ADDRESSES + " not found");
+ }
+ List<ByteBuffer> serializedNetAddresses = dictionary.getValues()
+ .get(IMPALA_TNETWORK_ADDRESSES);
+ List<TNetworkAddress> networkAddresses = Lists
+ .newArrayListWithCapacity(serializedNetAddresses.size());
+ int index = 0;
+ TDeserializer deserializer = new TDeserializer(new Factory());
+ for (ByteBuffer serializedData : serializedNetAddresses) {
+ TNetworkAddress networkAddress = new TNetworkAddress();
+ try {
+ deserializer.deserialize(networkAddress, serializedData.array());
+ networkAddresses.add(networkAddress);
+ index++;
+ } catch (TException tException) {
+ throw new CatalogException(
+ "Could not deserialize network address at position " + index);
+ }
+ }
+ return networkAddresses;
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java
new file mode 100644
index 0000000..3414f4d
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.common.Metrics;
+import org.apache.impala.service.BackendConfig;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.ServerContext;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServerEventHandler;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * CatalogMetastoreServer implements ThriftHiveMetastore interface. This is useful to
+ * expose HMS APIs via catalog server. Currently, most of the APIs implementations are
+ * "pass-through" to the Metastore server except for the following 3 which are mostly
+ * useful for getting table and partition level metadata during query planning.
+ * 1. get_table_req
+ * 2. get_partitions_by_expr
+ * 3. get_partitions_by_names_req
+ *
+ * This class mostly deals with the thrift server instantiation and its lifecycle
+ * management. The actual implementation of the HMS APIs is done in
+ * {@link CatalogMetastoreServiceHandler} class.
+ */
+public class CatalogMetastoreServer extends ThriftHiveMetastore implements
+ ICatalogMetastoreServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CatalogMetastoreServer.class);
+
+ // Maximum number of bytes to read from transport for variable length fields
+ // (strings, bytes). Also, used as a maximum number of elements to read for
+ // containers (maps, lists etc) fields.
+ private static final int MAX_MESSAGE_SIZE = 100 * 1024 * 1024;
+
+ // Minimum number of thrift server threads (concurrent connections) which serve the
+ // clients. // TODO make this configurable
+ private static final int MIN_SERVER_THREADS = 1;
+ // Maximum number of thrift server threads (concurrent connections) which serve the
+ // clients. // TODO make this configurable. A connection which is beyond this limit
+ // will be blocked until a server thread is closed.
+ private static final int MAX_SERVER_THREADS = 500;
+
+ private static final String ACTIVE_CONNECTIONS_METRIC = "metastore.active.connections";
+ private static final String RPC_DURATION_FORMAT_METRIC = "metastore.rpc.duration.%s";
+
+ // flag to indicate if the server is started or not
+ private final AtomicBoolean started_ = new AtomicBoolean(false);
+
+ // the server is started in a daemon thread so that instantiating this is not
+ // a blocking call.
+ private CompletableFuture<Void> serverHandle_;
+
+ // reference to the catalog Service catalog object
+ private final CatalogServiceCatalog catalog_;
+
+ // Metrics for this Metastore server. Also this instance is passed to the
+ // TServerEventHandler when the server is started so that RPC metrics can be registered.
+ private final Metrics metrics_ = new Metrics();
+
+ public CatalogMetastoreServer(CatalogServiceCatalog catalogServiceCatalog) {
+ Preconditions.checkNotNull(catalogServiceCatalog);
+ catalog_ = catalogServiceCatalog;
+ }
+
+ /**
+ * Simple RpcEventHandler which adds metrics for this Metastore server
+ */
+ private class RpcMetricsEventHandler implements TServerEventHandler {
+
+ @Override
+ public void preServe() {}
+
+ @Override
+ public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
+ metrics_.getCounter(ACTIVE_CONNECTIONS_METRIC).inc();
+ return null;
+ }
+
+ @Override
+ public void deleteContext(ServerContext serverContext, TProtocol tProtocol,
+ TProtocol tProtocol1) {
+ metrics_.getCounter(ACTIVE_CONNECTIONS_METRIC).dec();
+ }
+
+ @Override
+ public void processContext(ServerContext serverContext, TTransport tTransport,
+ TTransport tTransport1) {
+ }
+ }
+
+ /**
+ * Simple wrapper InvocationHandler which registers the duration metrics for each method
+ * called on the Proxy instance. The method execution is delegated to the the handler
+ * instance in the invoke method. Using such a invocation handler is much simpler than
+ * wrapping all the methods in the {@link CatalogMetastoreServiceHandler}. Additionally,
+ * this class also logs an error with the full trace in case the method invocation
+ * fails.
+ */
+ private class TimingInvocationHandler implements InvocationHandler {
+
+ private final CatalogMetastoreServiceHandler handler_;
+
+ TimingInvocationHandler(CatalogMetastoreServiceHandler handler) {
+ Preconditions.checkNotNull(handler);
+ handler_ = handler;
+ }
+
+ /**
+ * This method is called on every HMS API invocation. We invoke the method on the
+ * handler class with the given set of arguments. Additionally, this class is used to
+ * register the duration of such API calls.
+ */
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ Timer.Context context =
+ metrics_.getTimer(String.format(RPC_DURATION_FORMAT_METRIC, method.getName()))
+ .time();
+ try {
+ LOG.debug("Invoking HMS API: {}", method.getName());
+ return method.invoke(handler_, args);
+ } catch (Exception ex) {
+ Throwable unwrapped = unwrap(ex);
+ LOG.error("Received exception while executing " + method.getName() + " : ",
+ unwrapped);
+ throw unwrapped;
+ } finally {
+ context.stop();
+ }
+ }
+
+ /**
+ * The InvocationHandler throws an InvocationTargetException if the underlying method
+ * throws and exception. This method unwraps the underlying cause of such an exception
+ * and returns it if available.
+ */
+ private Throwable unwrap(Exception ex) {
+ if (ex instanceof InvocationTargetException) {
+ return ((InvocationTargetException) ex).getTargetException();
+ }
+ return ex;
+ }
+ }
+
+ @VisibleForTesting
+ protected int getPort() throws CatalogException {
+ return BackendConfig.INSTANCE.getHMSPort();
+ }
+
+ /**
+ * Starts the thrift server in a background thread and the configured port. Currently,
+ * only support NOSASL mode. TODO Add SASL and ssl support (IMPALA-10638)
+ *
+ * @throws CatalogException
+ */
+ public synchronized void start() throws CatalogException {
+ final int portNumber = getPort();
+ Preconditions.checkState(portNumber > 0);
+ Preconditions.checkState(!started_.get(), "Metastore server is already started");
+ LOG.info("Starting the Metastore server at port number {}", portNumber);
+ CatalogMetastoreServiceHandler handler =
+ new CatalogMetastoreServiceHandler(catalog_, metrics_,
+ BackendConfig.INSTANCE.fallbackToHMSOnErrors());
+ // create a proxy class for the ThriftMetastore.Iface and ICatalogMetastoreServer
+ // so that all the APIs can be invoked via a TimingInvocationHandler
+ ThriftHiveMetastore.Iface proxyCatalogHMSIFace =
+ (ThriftHiveMetastore.Iface) Proxy
+ .newProxyInstance(ThriftHiveMetastore.Iface.class.getClassLoader(),
+ new Class[]{ThriftHiveMetastore.Iface.class,
+ ICatalogMetastoreServer.class},
+ new TimingInvocationHandler(handler));
+ //TODO Add Sasl support (IMPALA-10638)
+ final TProtocolFactory protocolFactory;
+ final TProtocolFactory inputProtoFactory;
+ //TODO add config for this (IMPALA-10639)
+ boolean useCompactProtocol = false;
+ if (useCompactProtocol) {
+ protocolFactory = new TCompactProtocol.Factory();
+ inputProtoFactory = new TCompactProtocol.Factory(MAX_MESSAGE_SIZE,
+ MAX_MESSAGE_SIZE);
+ } else {
+ protocolFactory = new TBinaryProtocol.Factory();
+ inputProtoFactory = new TBinaryProtocol.Factory(true, true, MAX_MESSAGE_SIZE,
+ MAX_MESSAGE_SIZE);
+ }
+
+ TProcessor processor;
+ try {
+ processor =
+ new ThriftHiveMetastore.Processor<>(proxyCatalogHMSIFace);
+ } catch (Exception e) {
+ throw new CatalogException("Unable to create processor for catalog metastore "
+ + "server", e);
+ }
+
+ //TODO add SSL support
+ boolean useSSL = false;
+ TServerSocket serverSocket;
+ try {
+ serverSocket =
+ new TServerSocketKeepAlive(
+ new TServerSocket(new InetSocketAddress(portNumber)));
+ } catch (TTransportException e) {
+ throw new CatalogException(
+ "Unable to create server socket at port number " + portNumber, e);
+ }
+
+ TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket)
+ .processor(processor)
+ .transportFactory(new TTransportFactory())
+ .protocolFactory(protocolFactory)
+ .inputProtocolFactory(inputProtoFactory)
+ .minWorkerThreads(MIN_SERVER_THREADS)
+ .maxWorkerThreads(MAX_SERVER_THREADS);
+
+ TServer tServer = new TThreadPoolServer(args);
+ TServerEventHandler rpcMetricsEventHandler = new RpcMetricsEventHandler();
+
+ tServer.setServerEventHandler(rpcMetricsEventHandler);
+ LOG.info("Started the new metaserver on port [" + portNumber
+ + "]...");
+ LOG.info("minWorkerThreads = "
+ + MIN_SERVER_THREADS);
+ LOG.info("maxWorkerThreads = "
+ + MAX_SERVER_THREADS);
+ LOG.info("Enable SSL = " + useSSL);
+ serverHandle_ = CompletableFuture.runAsync(() -> tServer.serve());
+ started_.set(true);
+ }
+
+ /**
+ * Returns the RPC and connection metrics for this metastore server. //TODO hook this
+ * method to the Catalog's debug UI
+ */
+ @Override
+ public String getMetrics() {
+ return metrics_.toString();
+ }
+
+ /**
+ * Stops this CatalogMetastoreServer on a best-effort basis. May interrupt running
+ * threads in the server.
+ * <p>
+ * // TODO currently this method is not used anywhere. We should hook this method to the
+ * shutdown process of catalogd
+ */
+ public void stop() throws CatalogException {
+ serverHandle_.cancel(true);
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
new file mode 100644
index 0000000..be00d69
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import org.apache.hadoop.hive.metastore.api.GetFieldsRequest;
+import org.apache.hadoop.hive.metastore.api.GetFieldsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse;
+import org.apache.hadoop.hive.metastore.api.GetSchemaRequest;
+import org.apache.hadoop.hive.metastore.api.GetSchemaResponse;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableResult;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdRequest;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.SeedTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.SeedTxnIdRequest;
+import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.catalog.CatalogHmsAPIHelper;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.common.Metrics;
+import org.apache.impala.service.BackendConfig;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the HMS APIs that are served by CatalogD
+ * and is exposed via {@link CatalogMetastoreServer}.
+ * HMS APIs that are redirected to HMS can be found in {@link MetastoreServiceHandler}.
+ *
+ */
+public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(CatalogMetastoreServiceHandler.class);
+
+ public CatalogMetastoreServiceHandler(CatalogServiceCatalog catalog, Metrics metrics,
+ boolean fallBackToHMSOnErrors) {
+ super(catalog, metrics, fallBackToHMSOnErrors);
+ }
+
+ @Override
+ public GetTableResult get_table_req(GetTableRequest getTableRequest)
+ throws MetaException, NoSuchObjectException, TException {
+
+ if (!BackendConfig.INSTANCE.enableCatalogdHMSCache()) {
+ return super.get_table_req(getTableRequest);
+ }
+
+ try {
+ LOG.trace("Received get_Table_req for {}. File metadata is {}",
+ getTableRequest.getTblName(), getTableRequest.isGetFileMetadata());
+ return CatalogHmsAPIHelper.getTableReq(catalog_, defaultCatalogName_,
+ getTableRequest);
+ } catch (Exception e) {
+ // we catch the CatalogException and fall-back to HMS
+ throwIfNoFallback(e, "get_table_req");
+ }
+ return super.get_table_req(getTableRequest);
+ }
+
+ /**
+ * This is the main API which is used by Hive to get the partitions. In case of Hive it
+ * pushes the pruning logic to HMS by sending over the expression which is used to
+ * filter the partitions during query compilation. The expression is specific to Hive
+ * and loaded in the runtime based on whether we have hive-exec jar in the classpath
+ * or not. If the hive-exec jar is not present in the classpath, we fall-back to HMS
+ * since Catalog has no way to deserialize the expression sent over by the client.
+ */
+ @Override
+ public PartitionsByExprResult get_partitions_by_expr(
+ PartitionsByExprRequest partitionsByExprRequest) throws TException {
+
+ if (!BackendConfig.INSTANCE.enableCatalogdHMSCache()) {
+ return super.get_partitions_by_expr(partitionsByExprRequest);
+ }
+
+ try {
+ // expressionProxy is null or if there were errors when loading the
+ // PartitionExpressionProxy.
+ if (expressionProxy_ != null) {
+ return CatalogHmsAPIHelper.getPartitionsByExpr(
+ catalog_, defaultCatalogName_, partitionsByExprRequest, expressionProxy_);
+ } else {
+ throw new CatalogException("PartitionExpressionProxy could not be initialized");
+ }
+ } catch (Exception e) {
+ // we catch the CatalogException and fall-back to HMS
+ throwIfNoFallback(e, GET_PARTITION_BY_EXPR);
+ }
+ String tblName =
+ partitionsByExprRequest.getDbName() + "." + partitionsByExprRequest.getTblName();
+ LOG.info(String.format(HMS_FALLBACK_MSG_FORMAT, GET_PARTITION_BY_EXPR, tblName));
+ return super.get_partitions_by_expr(partitionsByExprRequest);
+ }
+
+ /**
+ * HMS API to get the partitions filtered by a provided list of names. The request
+ * contains a list of partitions names which the client is interested in. Catalog
+ * returns the partitions only for requested names. Additionally, this API also returns
+ * the file-metadata for the returned partitions if the request has
+ * {@code getFileMetadata} flag set. In case of errors, this API falls back to HMS if
+ * {@code fallBackToHMSOnErrors_} is set.
+ */
+ @Override
+ public GetPartitionsByNamesResult get_partitions_by_names_req(
+ GetPartitionsByNamesRequest getPartitionsByNamesRequest) throws TException {
+
+ if (!BackendConfig.INSTANCE.enableCatalogdHMSCache()) {
+ return super.get_partitions_by_names_req(getPartitionsByNamesRequest);
+ }
+
+ try {
+ return CatalogHmsAPIHelper
+ .getPartitionsByNames(catalog_, serverConf_, getPartitionsByNamesRequest);
+ } catch (Exception ex) {
+ throwIfNoFallback(ex, GET_PARTITION_BY_NAMES);
+ }
+ String tblName =
+ getPartitionsByNamesRequest.getDb_name() + "." + getPartitionsByNamesRequest
+ .getTbl_name();
+ LOG.info(String.format(HMS_FALLBACK_MSG_FORMAT, GET_PARTITION_BY_NAMES, tblName));
+ return super.get_partitions_by_names_req(getPartitionsByNamesRequest);
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java
new file mode 100644
index 0000000..e5aa7a9
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import org.apache.impala.catalog.CatalogException;
+
+/**
+ * This is the main Interface which a CatalogMetastore service should implement. It
+ * provides lifecycle and monitoring methods which are called from
+ * CatalogServiceCatalog to instantiate a Metastore service.
+ */
+public interface ICatalogMetastoreServer {
+
+ /**
+ * Starts the metastore service
+ * @throws CatalogException
+ */
+ void start() throws CatalogException;
+
+ /**
+ * Stop the metastore service.
+ * @throws CatalogException
+ */
+ void stop() throws CatalogException;
+
+ /**
+ * Returns the metrics for this Catalog Metastore service.
+ * @return Encoded String (eg. Json format) representation of all the metrics for this
+ * metastore service.
+ */
+ String getMetrics();
+}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
new file mode 100644
index 0000000..9a1b6ba
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
@@ -0,0 +1,2736 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import com.facebook.fb303.fb_status;
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.PartFilterExprUtil;
+import org.apache.hadoop.hive.metastore.PartitionExpressionProxy;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.AddCheckConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.AddDefaultConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AddForeignKeyRequest;
+import org.apache.hadoop.hive.metastore.api.AddNotNullConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.AddPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.AddPrimaryKeyRequest;
+import org.apache.hadoop.hive.metastore.api.AddUniqueConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.AlterCatalogRequest;
+import org.apache.hadoop.hive.metastore.api.AlterISchemaRequest;
+import org.apache.hadoop.hive.metastore.api.AlterPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.AlterPartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.AlterTableRequest;
+import org.apache.hadoop.hive.metastore.api.AlterTableResponse;
+import org.apache.hadoop.hive.metastore.api.CacheFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.CacheFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.CheckConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.CheckConstraintsResponse;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.ClearFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.ClearFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.CmRecycleRequest;
+import org.apache.hadoop.hive.metastore.api.CmRecycleResponse;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionInfoStruct;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.ConfigValSecurityException;
+import org.apache.hadoop.hive.metastore.api.CreateCatalogRequest;
+import org.apache.hadoop.hive.metastore.api.CreateTableRequest;
+import org.apache.hadoop.hive.metastore.api.CreationMetadata;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DefaultConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.DefaultConstraintsResponse;
+import org.apache.hadoop.hive.metastore.api.DropCatalogRequest;
+import org.apache.hadoop.hive.metastore.api.DropConstraintRequest;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsResult;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.ExtendedTableInfo;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.FindSchemasByColsResp;
+import org.apache.hadoop.hive.metastore.api.FindSchemasByColsRqst;
+import org.apache.hadoop.hive.metastore.api.FireEventRequest;
+import org.apache.hadoop.hive.metastore.api.FireEventResponse;
+import org.apache.hadoop.hive.metastore.api.ForeignKeysRequest;
+import org.apache.hadoop.hive.metastore.api.ForeignKeysResponse;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.GetAllFunctionsResponse;
+import org.apache.hadoop.hive.metastore.api.GetCatalogRequest;
+import org.apache.hadoop.hive.metastore.api.GetCatalogResponse;
+import org.apache.hadoop.hive.metastore.api.GetCatalogsResponse;
+import org.apache.hadoop.hive.metastore.api.GetDatabaseRequest;
+import org.apache.hadoop.hive.metastore.api.GetFieldsRequest;
+import org.apache.hadoop.hive.metastore.api.GetFieldsResponse;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataByExprResult;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.GetFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionNamesPsRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionResponse;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsPsWithAuthResponse;
+import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleRequest;
+import org.apache.hadoop.hive.metastore.api.GetPrincipalsInRoleResponse;
+import org.apache.hadoop.hive.metastore.api.GetReplicationMetricsRequest;
+import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalRequest;
+import org.apache.hadoop.hive.metastore.api.GetRoleGrantsForPrincipalResponse;
+import org.apache.hadoop.hive.metastore.api.GetRuntimeStatsRequest;
+import org.apache.hadoop.hive.metastore.api.GetSchemaRequest;
+import org.apache.hadoop.hive.metastore.api.GetSchemaResponse;
+import org.apache.hadoop.hive.metastore.api.GetSerdeRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableResult;
+import org.apache.hadoop.hive.metastore.api.GetTablesExtRequest;
+import org.apache.hadoop.hive.metastore.api.GetTablesRequest;
+import org.apache.hadoop.hive.metastore.api.GetTablesResult;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeRequest;
+import org.apache.hadoop.hive.metastore.api.GrantRevokePrivilegeResponse;
+import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleRequest;
+import org.apache.hadoop.hive.metastore.api.GrantRevokeRoleResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.HiveObjectPrivilege;
+import org.apache.hadoop.hive.metastore.api.HiveObjectRef;
+import org.apache.hadoop.hive.metastore.api.ISchema;
+import org.apache.hadoop.hive.metastore.api.ISchemaName;
+import org.apache.hadoop.hive.metastore.api.InvalidInputException;
+import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.MapSchemaVersionToSerdeRequest;
+import org.apache.hadoop.hive.metastore.api.Materialization;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdRequest;
+import org.apache.hadoop.hive.metastore.api.MaxAllocatedTableWriteIdResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.NotNullConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.NotNullConstraintsResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountRequest;
+import org.apache.hadoop.hive.metastore.api.NotificationEventsCountResponse;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.OptionalCompactionInfoStruct;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.PartitionEventType;
+import org.apache.hadoop.hive.metastore.api.PartitionSpec;
+import org.apache.hadoop.hive.metastore.api.PartitionValuesRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionValuesResponse;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsByExprResult;
+import org.apache.hadoop.hive.metastore.api.PartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsResponse;
+import org.apache.hadoop.hive.metastore.api.PartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.PartitionsStatsResult;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysRequest;
+import org.apache.hadoop.hive.metastore.api.PrimaryKeysResponse;
+import org.apache.hadoop.hive.metastore.api.PrincipalPrivilegeSet;
+import org.apache.hadoop.hive.metastore.api.PrincipalType;
+import org.apache.hadoop.hive.metastore.api.PrivilegeBag;
+import org.apache.hadoop.hive.metastore.api.PutFileMetadataRequest;
+import org.apache.hadoop.hive.metastore.api.PutFileMetadataResult;
+import org.apache.hadoop.hive.metastore.api.RenamePartitionRequest;
+import org.apache.hadoop.hive.metastore.api.RenamePartitionResponse;
+import org.apache.hadoop.hive.metastore.api.ReplicationMetricList;
+import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest;
+import org.apache.hadoop.hive.metastore.api.Role;
+import org.apache.hadoop.hive.metastore.api.RuntimeStat;
+import org.apache.hadoop.hive.metastore.api.SQLCheckConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLDefaultConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
+import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
+import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
+import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
+import org.apache.hadoop.hive.metastore.api.ScheduledQuery;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryKey;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryMaintenanceRequest;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollRequest;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryPollResponse;
+import org.apache.hadoop.hive.metastore.api.ScheduledQueryProgressInfo;
+import org.apache.hadoop.hive.metastore.api.SchemaVersion;
+import org.apache.hadoop.hive.metastore.api.SchemaVersionDescriptor;
+import org.apache.hadoop.hive.metastore.api.SeedTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.SeedTxnIdRequest;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
+import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsResponse;
+import org.apache.hadoop.hive.metastore.api.SetSchemaVersionStateRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableMeta;
+import org.apache.hadoop.hive.metastore.api.TableStatsRequest;
+import org.apache.hadoop.hive.metastore.api.TableStatsResult;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface;
+import org.apache.hadoop.hive.metastore.api.TruncateTableRequest;
+import org.apache.hadoop.hive.metastore.api.TruncateTableResponse;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.Type;
+import org.apache.hadoop.hive.metastore.api.UniqueConstraintsRequest;
+import org.apache.hadoop.hive.metastore.api.UniqueConstraintsResponse;
+import org.apache.hadoop.hive.metastore.api.UnknownDBException;
+import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.WMAlterPoolRequest;
+import org.apache.hadoop.hive.metastore.api.WMAlterPoolResponse;
+import org.apache.hadoop.hive.metastore.api.WMAlterResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMAlterResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMAlterTriggerRequest;
+import org.apache.hadoop.hive.metastore.api.WMAlterTriggerResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreateOrDropTriggerToPoolMappingRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreateOrDropTriggerToPoolMappingResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreateOrUpdateMappingRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreateOrUpdateMappingResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreatePoolRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreatePoolResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreateResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreateResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMCreateTriggerRequest;
+import org.apache.hadoop.hive.metastore.api.WMCreateTriggerResponse;
+import org.apache.hadoop.hive.metastore.api.WMDropMappingRequest;
+import org.apache.hadoop.hive.metastore.api.WMDropMappingResponse;
+import org.apache.hadoop.hive.metastore.api.WMDropPoolRequest;
+import org.apache.hadoop.hive.metastore.api.WMDropPoolResponse;
+import org.apache.hadoop.hive.metastore.api.WMDropResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMDropResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMDropTriggerRequest;
+import org.apache.hadoop.hive.metastore.api.WMDropTriggerResponse;
+import org.apache.hadoop.hive.metastore.api.WMGetActiveResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMGetActiveResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMGetAllResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMGetAllResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMGetResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMGetResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMGetTriggersForResourePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMGetTriggersForResourePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanRequest;
+import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogResponse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.impala.catalog.CatalogHmsAPIHelper;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.common.Metrics;
+import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.util.AcidUtils;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class implements the HMS APIs that are redirected to the HMS server from CatalogD.
+ * APIs that should be served from CatalogD must be overridden in {@link
+ * CatalogMetastoreServer}.
+ * <p>
+ * Implementation Notes: Care should taken to use
+ * {@link IMetaStoreClient#getThriftClient()}
+ * method when forwarding a API call to HMS service since IMetastoreClient itself modifies
+ * the arguments before sending the RPC to the HMS server. This can lead to unexpected
+ * side-effects like (processorCapabilities do not match with the actual client).
+ */
+public abstract class MetastoreServiceHandler implements Iface {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(MetastoreServiceHandler.class);
+ protected static final String NOT_IMPLEMENTED_UNSUPPORTED = "%s method not supported"
+ + " by Catalog metastore service.";
+ protected static final String METAEXCEPTION_MSG_FORMAT =
+ "Unexpected error occurred while"
+ + " executing %s. Cause: %s. See catalog logs for details.";
+ protected static final String HMS_FALLBACK_MSG_FORMAT = "Forwarding the request %s for "
+ + "table %s to the backing HiveMetastore service";
+
+ // constant used for logging error messages
+ public static final String GET_PARTITION_BY_EXPR = "get_partitions_by_expr";
+ public static final String GET_PARTITION_BY_NAMES = "get_partitions_by_names_req";
+ protected final CatalogServiceCatalog catalog_;
+ protected final boolean fallBackToHMSOnErrors_;
+ protected final Metrics metrics_;
+ // TODO handle session configuration
+ protected Configuration serverConf_;
+ protected PartitionExpressionProxy expressionProxy_;
+ protected final String defaultCatalogName_;
+
+ public MetastoreServiceHandler(CatalogServiceCatalog catalog, Metrics metrics,
+ boolean fallBackToHMSOnErrors) {
+ catalog_ = Preconditions.checkNotNull(catalog);
+ metrics_ = Preconditions.checkNotNull(metrics);
+ fallBackToHMSOnErrors_ = fallBackToHMSOnErrors;
+ LOG.info("Fallback to hive metastore service on errors is {}",
+ fallBackToHMSOnErrors_);
+ // load the metastore configuration from the classpath
+ serverConf_ = Preconditions.checkNotNull(MetastoreConf.newMetastoreConf());
+ String className = MetastoreConf
+ .get(serverConf_, ConfVars.EXPRESSION_PROXY_CLASS.getVarname());
+ try {
+ Preconditions.checkNotNull(className);
+ LOG.info("Instantiating {}", className);
+ expressionProxy_ = PartFilterExprUtil.createExpressionProxy(serverConf_);
+ if (expressionProxy_ instanceof DefaultPartitionExpressionProxy) {
+ LOG.error("PartFilterExprUtil.createExpressionProxy returned"
+ + " DefaultPartitionExpressionProxy. Check if hive-exec"
+ + " jar is available in the classpath.");
+ expressionProxy_ = null;
+ }
+ } catch (Exception ex) {
+ LOG.error("Could not instantiate {}", className, ex);
+ }
+ defaultCatalogName_ =
+ MetaStoreUtils.getDefaultCatalog(serverConf_);
+ }
+
+ @Override
+ public String get_hms_api_version() throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_hms_api_version();
+ }
+ }
+
+ @Override
+ public String getMetaConf(String configKey) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().getMetaConf(configKey);
+ }
+ }
+
+ @Override
+ public void setMetaConf(String configKey, String configValue)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().setMetaConf(configKey, configValue);
+ }
+ }
+
+ @Override
+ public void create_catalog(CreateCatalogRequest createCatalogRequest)
+ throws AlreadyExistsException, InvalidObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().create_catalog(createCatalogRequest);
+ }
+ }
+
+ @Override
+ public void alter_catalog(AlterCatalogRequest alterCatalogRequest)
+ throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().alter_catalog(alterCatalogRequest);
+ }
+ }
+
+ @Override
+ public GetCatalogResponse get_catalog(GetCatalogRequest getCatalogRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_catalog(getCatalogRequest);
+ }
+ }
+
+ @Override
+ public GetCatalogsResponse get_catalogs() throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_catalogs();
+ }
+ }
+
+ @Override
+ public void drop_catalog(DropCatalogRequest dropCatalogRequest)
+ throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().drop_catalog(dropCatalogRequest);
+ }
+ }
+
+ @Override
+ public void create_database(Database database)
+ throws AlreadyExistsException, InvalidObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().create_database(database);
+ }
+ }
+
+ @Override
+ public Database get_database(String databaseName)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_database(databaseName);
+ }
+ }
+
+ @Override
+ public Database get_database_req(GetDatabaseRequest getDatabaseRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_database_req(getDatabaseRequest);
+ }
+ }
+
+ @Override
+ public void drop_database(String databaseName, boolean deleteData,
+ boolean ignoreUnknownDb)
+ throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .drop_database(databaseName, deleteData, ignoreUnknownDb);
+ }
+ }
+
+ @Override
+ public List<String> get_databases(String pattern) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_databases(pattern);
+ }
+ }
+
+ @Override
+ public List<String> get_all_databases() throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_all_databases();
+ }
+ }
+
+ @Override
+ public void alter_database(String dbname, Database database)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().alter_database(dbname, database);
+ }
+ }
+
+ @Override
+ public Type get_type(String name) throws MetaException, NoSuchObjectException,
+ TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_type(name);
+ }
+ }
+
+ @Override
+ public boolean create_type(Type type)
+ throws AlreadyExistsException, InvalidObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().create_type(type);
+ }
+ }
+
+ @Override
+ public boolean drop_type(String type)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().drop_type(type);
+ }
+ }
+
+ @Override
+ public Map<String, Type> get_type_all(String s) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_type_all(s);
+ }
+ }
+
+ @Override
+ public List<FieldSchema> get_fields(String dbname, String tblname)
+ throws MetaException, UnknownTableException, UnknownDBException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_fields(dbname, tblname);
+ }
+ }
+
+ @Override
+ public List<FieldSchema> get_fields_with_environment_context(String dbName,
+ String tblName, EnvironmentContext environmentContext)
+ throws MetaException, UnknownTableException, UnknownDBException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_fields_with_environment_context(dbName, tblName, environmentContext);
+ }
+ }
+
+ @Override
+ public List<FieldSchema> get_schema(String dbname, String tblname)
+ throws MetaException, UnknownTableException, UnknownDBException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_schema(dbname, tblname);
+ }
+ }
+
+ @Override
+ public List<FieldSchema> get_schema_with_environment_context(String dbname,
+ String tblname, EnvironmentContext environmentContext)
+ throws MetaException, UnknownTableException, UnknownDBException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_schema_with_environment_context(dbname, tblname, environmentContext);
+ }
+ }
+
+ @Override
+ public void create_table(Table table)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().create_table(table);
+ }
+ }
+
+ @Override
+ public void create_table_with_environment_context(Table table,
+ EnvironmentContext environmentContext)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .create_table_with_environment_context(table, environmentContext);
+ }
+ }
+
+ @Override
+ public void create_table_with_constraints(Table table,
+ List<SQLPrimaryKey> sqlPrimaryKeys,
+ List<SQLForeignKey> sqlForeignKeys, List<SQLUniqueConstraint> sqlUniqueConstraints,
+ List<SQLNotNullConstraint> sqlNotNullConstraints,
+ List<SQLDefaultConstraint> sqlDefaultConstraints,
+ List<SQLCheckConstraint> sqlCheckConstraints)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().create_table_with_constraints(table,
+ sqlPrimaryKeys, sqlForeignKeys, sqlUniqueConstraints, sqlNotNullConstraints,
+ sqlDefaultConstraints, sqlCheckConstraints);
+ }
+ }
+
+ @Override
+ public void create_table_req(CreateTableRequest createTableRequest)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().create_table_req(createTableRequest);
+ }
+ }
+
+ @Override
+ public void drop_constraint(DropConstraintRequest dropConstraintRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().drop_constraint(dropConstraintRequest);
+ }
+ }
+
+ @Override
+ public void add_primary_key(AddPrimaryKeyRequest addPrimaryKeyRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().add_primary_key(addPrimaryKeyRequest);
+ }
+ }
+
+ @Override
+ public void add_foreign_key(AddForeignKeyRequest addForeignKeyRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().add_foreign_key(addForeignKeyRequest);
+ }
+ }
+
+ @Override
+ public void add_unique_constraint(AddUniqueConstraintRequest addUniqueConstraintRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .add_unique_constraint(addUniqueConstraintRequest);
+ }
+ }
+
+ @Override
+ public void add_not_null_constraint(
+ AddNotNullConstraintRequest addNotNullConstraintRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .add_not_null_constraint(addNotNullConstraintRequest);
+ }
+ }
+
+ @Override
+ public void add_default_constraint(
+ AddDefaultConstraintRequest addDefaultConstraintRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .add_default_constraint(addDefaultConstraintRequest);
+ }
+ }
+
+ @Override
+ public void add_check_constraint(AddCheckConstraintRequest addCheckConstraintRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .add_check_constraint(addCheckConstraintRequest);
+ }
+ }
+
+ @Override
+ public void drop_table(String dbname, String tblname, boolean deleteData)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().drop_table(dbname, tblname, deleteData);
+ }
+ }
+
+ @Override
+ public void drop_table_with_environment_context(String dbname, String tblname,
+ boolean deleteData,
+ EnvironmentContext environmentContext)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .drop_table_with_environment_context(dbname, tblname, deleteData,
+ environmentContext);
+ }
+ }
+
+ @Override
+ public void truncate_table(String dbName, String tblName, List<String> partNames)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().truncate_table(dbName, tblName, partNames);
+ }
+ }
+
+ @Override
+ public TruncateTableResponse truncate_table_req(
+ TruncateTableRequest truncateTableRequest) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .truncate_table_req(truncateTableRequest);
+ }
+ }
+
+ @Override
+ public List<String> get_tables(String dbname, String tblName)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_tables(dbname, tblName);
+ }
+ }
+
+ @Override
+ public List<String> get_tables_by_type(String dbname, String tablePattern,
+ String tableType)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_tables_by_type(dbname,
+ tablePattern, tableType);
+ }
+ }
+
+ @Override
+ public List<Table> get_all_materialized_view_objects_for_rewriting()
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_all_materialized_view_objects_for_rewriting();
+ }
+ }
+
+ @Override
+ public List<String> get_materialized_views_for_rewriting(String dbName)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_materialized_views_for_rewriting(dbName);
+ }
+ }
+
+ @Override
+ public List<TableMeta> get_table_meta(String dbnamePattern, String tblNamePattern,
+ List<String> tableTypes)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_table_meta(dbnamePattern,
+ tblNamePattern, tableTypes);
+ }
+ }
+
+ @Override
+ public List<String> get_all_tables(String dbname) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_all_tables(dbname);
+ }
+ }
+
+ @Override
+ public Table get_table(String dbname, String tblname)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_table(dbname, tblname);
+ }
+ }
+
+ @Override
+ public List<Table> get_table_objects_by_name(String dbname, List<String> list)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_table_objects_by_name(dbname,
+ list);
+ }
+ }
+
+ @Override
+ public List<ExtendedTableInfo> get_tables_ext(GetTablesExtRequest getTablesExtRequest)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_tables_ext(getTablesExtRequest);
+ }
+ }
+
+ /**
+ * This method gets the table from the HMS directly. Additionally, if the request has
+ * {@code getFileMetadata} set it computes the filemetadata and returns it in the
+ * response. For transactional tables, it uses the ValidWriteIdList from the request and
+ * gets the current ValidTxnList to get the requested snapshot of the file-metadata for
+ * the table.
+ */
+ @Override
+ public GetTableResult get_table_req(GetTableRequest getTableRequest)
+ throws MetaException, NoSuchObjectException, TException {
+ String tblName = getTableRequest.getDbName() + "." + getTableRequest.getTblName();
+ LOG.debug(String.format(HMS_FALLBACK_MSG_FORMAT, "get_table_req", tblName));
+ GetTableResult result;
+ ValidTxnList txnList = null;
+ ValidWriteIdList writeIdList = null;
+ String requestWriteIdList = getTableRequest.getValidWriteIdList();
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ result = client.getHiveClient().getThriftClient()
+ .get_table_req(getTableRequest);
+ Table tbl = result.getTable();
+ // return early if file-metadata is not requested
+ if (!getTableRequest.isGetFileMetadata()) {
+ LOG.trace("File metadata is not requested. Returning table {}",
+ tbl.getTableName());
+ return result;
+ }
+ // we need to get the current ValidTxnIdList to avoid returning
+ // file-metadata for in-progress compactions. If the request does not
+ // include ValidWriteIdList or if the table is not transactional we compute
+ // the file-metadata as seen on the file-system.
+ boolean isTransactional = tbl.getParameters() != null && AcidUtils
+ .isTransactionalTable(tbl.getParameters());
+ if (isTransactional && requestWriteIdList != null) {
+ txnList = MetastoreShim.getValidTxns(client.getHiveClient());
+ writeIdList = MetastoreShim
+ .getValidWriteIdListFromString(requestWriteIdList);
+ }
+ }
+ CatalogHmsAPIHelper.loadAndSetFileMetadataFromFs(txnList, writeIdList, result);
+ return result;
+ }
+
+ @Override
+ public GetTablesResult get_table_objects_by_name_req(GetTablesRequest getTablesRequest)
+ throws MetaException, InvalidOperationException, UnknownDBException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_table_objects_by_name_req(getTablesRequest);
+ }
+ }
+
+ @Override
+ public Materialization get_materialization_invalidation_info(
+ CreationMetadata creationMetadata, String validTxnList)
+ throws MetaException, InvalidOperationException, UnknownDBException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_materialization_invalidation_info(creationMetadata, validTxnList);
+ }
+ }
+
+ @Override
+ public void update_creation_metadata(String catName, String dbName, String tblName,
+ CreationMetadata creationMetadata)
+ throws MetaException, InvalidOperationException, UnknownDBException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().update_creation_metadata(catName,
+ dbName, tblName, creationMetadata);
+ }
+ }
+
+ @Override
+ public List<String> get_table_names_by_filter(String dbname, String tblname,
+ short maxParts)
+ throws MetaException, InvalidOperationException, UnknownDBException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_table_names_by_filter(dbname,
+ tblname, maxParts);
+ }
+ }
+
+ @Override
+ public void alter_table(String dbname, String tblName, Table newTable)
+ throws InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().alter_table(dbname, tblName, newTable);
+ }
+ }
+
+ @Override
+ public void alter_table_with_environment_context(String dbname, String tblName,
+ Table table,
+ EnvironmentContext environmentContext)
+ throws InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .alter_table_with_environment_context(dbname,
+ tblName, table, environmentContext);
+ }
+ }
+
+ @Override
+ public void alter_table_with_cascade(String dbname, String tblName, Table table,
+ boolean cascade)
+ throws InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().alter_table_with_cascade(dbname, tblName,
+ table, cascade);
+ }
+ }
+
+ @Override
+ public AlterTableResponse alter_table_req(AlterTableRequest alterTableRequest)
+ throws InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().alter_table_req(alterTableRequest);
+ }
+ }
+
+ @Override
+ public Partition add_partition(Partition partition)
+ throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().add_partition(partition);
+ }
+ }
+
+ @Override
+ public Partition add_partition_with_environment_context(Partition partition,
+ EnvironmentContext environmentContext)
+ throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .add_partition_with_environment_context(partition, environmentContext);
+ }
+ }
+
+ @Override
+ public int add_partitions(List<Partition> partitionList)
+ throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().add_partitions(partitionList);
+ }
+ }
+
+ @Override
+ public int add_partitions_pspec(List<PartitionSpec> list)
+ throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().add_partitions_pspec(list);
+ }
+ }
+
+ @Override
+ public Partition append_partition(String dbname, String tblName, List<String> partVals)
+ throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .append_partition(dbname, tblName, partVals);
+ }
+ }
+
+ @Override
+ public AddPartitionsResult add_partitions_req(AddPartitionsRequest addPartitionsRequest)
+ throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .add_partitions_req(addPartitionsRequest);
+ }
+ }
+
+ @Override
+ public Partition append_partition_with_environment_context(String dbname,
+ String tblname,
+ List<String> partVals, EnvironmentContext environmentContext)
+ throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .append_partition_with_environment_context(dbname, tblname, partVals,
+ environmentContext);
+ }
+ }
+
+ @Override
+ public Partition append_partition_by_name(String dbname, String tblname,
+ String partName)
+ throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .append_partition_by_name(dbname, tblname, partName);
+ }
+ }
+
+ @Override
+ public Partition append_partition_by_name_with_environment_context(String dbname,
+ String tblname, String partName, EnvironmentContext environmentContext)
+ throws InvalidObjectException, AlreadyExistsException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .append_partition_by_name_with_environment_context(dbname, tblname, partName,
+ environmentContext);
+ }
+ }
+
+ @Override
+ public boolean drop_partition(String dbname, String tblanme, List<String> partVals,
+ boolean deleteData)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .drop_partition(dbname, tblanme, partVals, deleteData);
+ }
+ }
+
+ @Override
+ public boolean drop_partition_with_environment_context(String dbname, String tblname,
+ List<String> partNames, boolean deleteData, EnvironmentContext environmentContext)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .drop_partition_with_environment_context(dbname, tblname, partNames, deleteData,
+ environmentContext);
+ }
+ }
+
+ @Override
+ public boolean drop_partition_by_name(String dbname, String tblname, String partName,
+ boolean deleteData)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().drop_partition_by_name(dbname,
+ tblname, partName, deleteData);
+ }
+ }
+
+ @Override
+ public boolean drop_partition_by_name_with_environment_context(String dbName,
+ String tableName,
+ String partName, boolean deleteData, EnvironmentContext envContext)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .drop_partition_by_name_with_environment_context(dbName, tableName, partName,
+ deleteData, envContext);
+ }
+ }
+
+ @Override
+ public DropPartitionsResult drop_partitions_req(
+ DropPartitionsRequest dropPartitionsRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .drop_partitions_req(dropPartitionsRequest);
+ }
+ }
+
+ @Override
+ public Partition get_partition(String dbName, String tblName, List<String> values)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_partition(dbName, tblName,
+ values);
+ }
+ }
+
+ @Override
+ public Partition exchange_partition(Map<String, String> partitionSpecMap,
+ String sourcedb, String sourceTbl,
+ String destDb, String destTbl)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .exchange_partition(partitionSpecMap, sourcedb, sourceTbl, destDb
+ , destTbl);
+ }
+ }
+
+ @Override
+ public List<Partition> exchange_partitions(Map<String, String> partitionSpecs,
+ String sourceDb, String sourceTable, String destDb,
+ String destinationTableName)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().exchange_partitions(partitionSpecs,
+ sourceDb, sourceTable, destDb, destinationTableName);
+ }
+ }
+
+ @Override
+ public Partition get_partition_with_auth(String dbname, String tblName,
+ List<String> values,
+ String user, List<String> groups)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_partition_with_auth(dbname,
+ tblName, values, user,
+ groups);
+ }
+ }
+
+ @Override
+ public Partition get_partition_by_name(String dbName, String tblName,
+ String partitionName)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_partition_by_name(dbName,
+ tblName, partitionName);
+ }
+ }
+
+ @Override
+ public List<Partition> get_partitions(String dbName, String tblName, short maxLimit)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_partitions(dbName, tblName, maxLimit);
+ }
+ }
+
+ @Override
+ public List<Partition> get_partitions_with_auth(String dbName, String tblName,
+ short maxParts, String username,
+ List<String> groups) throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_partitions_with_auth(dbName,
+ tblName, maxParts, username,
+ groups);
+ }
+ }
+
+ @Override
+ public List<PartitionSpec> get_partitions_pspec(String dbName, String tblName,
+ int maxParts)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_partitions_pspec(dbName, tblName, maxParts);
+ }
+ }
+
+ @Override
+ public GetPartitionsResponse get_partitions_with_specs(GetPartitionsRequest request)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_partitions_with_specs(request);
+ }
+ }
+
+ @Override
+ public List<String> get_partition_names(String dbName, String tblName, short maxParts)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_partition_names(dbName,
+ tblName, maxParts);
+ }
+ }
+
+ @Override
+ public PartitionValuesResponse get_partition_values(
+ PartitionValuesRequest partitionValuesRequest)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_partition_values(partitionValuesRequest);
+ }
+ }
+
+ @Override
+ public List<Partition> get_partitions_ps(String dbName, String tblName,
+ List<String> partValues,
+ short maxParts) throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_partitions_ps(dbName, tblName, partValues, maxParts);
+ }
+ }
+
+ @Override
+ public List<Partition> get_partitions_ps_with_auth(String dbName, String tblName,
+ List<String> partVals, short maxParts, String user, List<String> groups)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_partitions_ps_with_auth(dbName, tblName
+ , partVals, maxParts, user, groups);
+ }
+ }
+
+ @Override
+ public List<String> get_partition_names_ps(String dbName, String tblName,
+ List<String> partitionNames,
+ short maxParts) throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_partition_names_ps(dbName, tblName,
+ partitionNames, maxParts);
+ }
+ }
+
+ @Override
+ public List<Partition> get_partitions_by_filter(String dbName, String tblName,
+ String filter, short maxParts)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_partitions_by_filter(dbName, tblName,
+ filter, maxParts);
+ }
+ }
+
+ @Override
+ public List<PartitionSpec> get_part_specs_by_filter(String dbName, String tblName,
+ String filter,
+ int maxParts) throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_part_specs_by_filter(dbName, tblName, filter
+ , maxParts);
+ }
+ }
+
+ @Override
+ public GetFieldsResponse get_fields_req(GetFieldsRequest req)
+ throws MetaException, UnknownTableException, UnknownDBException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ List<FieldSchema> fields = client.getHiveClient().getThriftClient()
+ .get_fields_with_environment_context(MetaStoreUtils
+ .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+ req.getTblName(), req.getEnvContext());
+ GetFieldsResponse res = new GetFieldsResponse();
+ res.setFields(fields);
+ return res;
+ }
+ }
+
+ @Override
+ public GetSchemaResponse get_schema_req(GetSchemaRequest req)
+ throws MetaException, UnknownTableException, UnknownDBException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ // TODO Remove the usage of old API here once this API is ported to cdpd-master
+ List<FieldSchema> fields = client.getHiveClient().getThriftClient()
+ .get_schema_with_environment_context(MetaStoreUtils
+ .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+ req.getTblName(), req.getEnvContext());
+ GetSchemaResponse res = new GetSchemaResponse();
+ res.setFields(fields);
+ return res;
+ }
+ }
+
+ @Override
+ public GetPartitionResponse get_partition_req(GetPartitionRequest req)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ Partition p =
+ client.getHiveClient().getThriftClient().get_partition(
+ MetaStoreUtils
+ .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+ req.getTblName(), req.getPartVals());
+ GetPartitionResponse res = new GetPartitionResponse();
+ res.setPartition(p);
+ return res;
+ }
+ }
+
+ @Override
+ public PartitionsResponse get_partitions_req(PartitionsRequest req)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ List<Partition> partitions =
+ client.getHiveClient().getThriftClient().get_partitions(MetaStoreUtils
+ .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+ req.getTblName(), req.getMaxParts());
+ PartitionsResponse res = new PartitionsResponse();
+ res.setPartitions(partitions);
+ return res;
+ }
+ }
+
+ @Override
+ public GetPartitionNamesPsResponse get_partition_names_ps_req(
+ GetPartitionNamesPsRequest req)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ List<String> names = client.getHiveClient().getThriftClient()
+ .get_partition_names_ps(MetaStoreUtils
+ .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+ req.getTblName(), req.getPartValues(), req.getMaxParts());
+ GetPartitionNamesPsResponse res = new GetPartitionNamesPsResponse();
+ res.setNames(names);
+ return res;
+ }
+ }
+
+ @Override
+ public GetPartitionsPsWithAuthResponse get_partitions_ps_with_auth_req(
+ GetPartitionsPsWithAuthRequest req)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ List<Partition> partitions = client.getHiveClient().getThriftClient()
+ .get_partitions_ps_with_auth(MetaStoreUtils
+ .prependCatalogToDbName(req.getCatName(), req.getDbName(), serverConf_),
+ req.getTblName(), req.getPartVals(), req.getMaxParts(),
+ req.getUserName(), req.getGroupNames());
+ GetPartitionsPsWithAuthResponse res = new GetPartitionsPsWithAuthResponse();
+ res.setPartitions(partitions);
+ return res;
+ }
+ }
+
+ @Override
+ public PartitionsByExprResult get_partitions_by_expr(
+ PartitionsByExprRequest partitionsByExprRequest) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_partitions_by_expr(partitionsByExprRequest);
+ }
+ }
+
+ @Override
+ public int get_num_partitions_by_filter(String dbName, String tblName, String filter)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_num_partitions_by_filter(dbName,
+ tblName, filter);
+ }
+ }
+
+ @Override
+ public List<Partition> get_partitions_by_names(String dbName, String tblName,
+ List<String> partitionNames)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_partitions_by_names(dbName, tblName,
+ partitionNames);
+ }
+ }
+
+ /**
+ * Util method to evaluate if the received exception needs to be thrown to the Client
+ * based on the server configuration.
+ *
+ * @param cause The underlying exception received from Catalog.
+ * @param apiName The HMS API name which threw the given exception.
+ * @throws TException Wrapped exception with the cause in case the given Exception is
+ * not a TException. Else, throws the given TException.
+ */
+ protected void throwIfNoFallback(Exception cause, String apiName)
+ throws TException {
+ LOG.debug("Received exception while executing {}", apiName, cause);
+ if (fallBackToHMSOnErrors_) return;
+ if (cause instanceof TException) throw (TException) cause;
+ // if this is not a TException we wrap it to a MetaException
+ throw new MetaException(
+ String.format(METAEXCEPTION_MSG_FORMAT, apiName, cause));
+ }
+
+ /**
+ * This method gets the partitions for the given list of names from HMS. Additionally,
+ * if the {@code getFileMetadata} flag is set in the request, it also computes the file
+ * metadata and sets it in the partitions which are returned.
+ *
+ * @throws TException
+ */
+ public GetPartitionsByNamesResult get_partitions_by_names_req(
+ GetPartitionsByNamesRequest getPartitionsByNamesRequest) throws TException {
+ String tblName =
+ getPartitionsByNamesRequest.getDb_name() + "." + getPartitionsByNamesRequest
+ .getTbl_name();
+ LOG.info(String.format(HMS_FALLBACK_MSG_FORMAT, GET_PARTITION_BY_NAMES, tblName));
+ boolean getFileMetadata = getPartitionsByNamesRequest.isGetFileMetadata();
+ GetPartitionsByNamesResult result;
+ ValidWriteIdList validWriteIdList = null;
+ ValidTxnList validTxnList = null;
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ result = client.getHiveClient().getThriftClient()
+ .get_partitions_by_names_req(getPartitionsByNamesRequest);
+ // if file-metadata is not request; return early
+ if (!getFileMetadata) return result;
+ // we don't really know if the requested partitions are for a transactional table
+ // or not. Hence we should get the table from HMS to confirm.
+ // TODO: may be we could assume that if ValidWriteIdList is not set, the table is
+ // not transactional
+ String[] parsedCatDbName = MetaStoreUtils
+ .parseDbName(getPartitionsByNamesRequest.getDb_name(), serverConf_);
+ Table tbl = client.getHiveClient().getTable(parsedCatDbName[0], parsedCatDbName[1],
+ getPartitionsByNamesRequest.getTbl_name(),
+ getPartitionsByNamesRequest.getValidWriteIdList());
+ boolean isTransactional = tbl.getParameters() != null && AcidUtils
+ .isTransactionalTable(tbl.getParameters());
+ if (isTransactional) {
+ if (getPartitionsByNamesRequest.getValidWriteIdList() == null) {
+ throw new MetaException(
+ "ValidWriteIdList is not set when requesting partitions for table " + tbl
+ .getDbName() + "." + tbl.getTableName());
+ }
+ validWriteIdList = MetastoreShim
+ .getValidWriteIdListFromString(
+ getPartitionsByNamesRequest.getValidWriteIdList());
+ validTxnList = client.getHiveClient().getValidTxns();
+ }
+ }
+ CatalogHmsAPIHelper
+ .loadAndSetFileMetadataFromFs(validTxnList, validWriteIdList, result);
+ return result;
+ }
+
+ @Override
+ public void alter_partition(String dbName, String tblName, Partition partition)
+ throws InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .alter_partition(dbName, tblName, partition);
+ }
+ }
+
+ @Override
+ public void alter_partitions(String dbNme, String tblName, List<Partition> partitions)
+ throws InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .alter_partitions(dbNme, tblName, partitions);
+ }
+ }
+
+ @Override
+ public void alter_partitions_with_environment_context(String s, String s1,
+ List<Partition> list, EnvironmentContext environmentContext)
+ throws InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .alter_partitions_with_environment_context(s, s1, list, environmentContext);
+ }
+ }
+
+ @Override
+ public AlterPartitionsResponse alter_partitions_req(
+ AlterPartitionsRequest alterPartitionsRequest)
+ throws InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .alter_partitions_req(alterPartitionsRequest);
+ }
+ }
+
+ @Override
+ public void alter_partition_with_environment_context(String dbName, String tblName,
+ Partition partition, EnvironmentContext environmentContext)
+ throws InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .alter_partition_with_environment_context(dbName, tblName, partition,
+ environmentContext);
+ }
+ }
+
+ @Override
+ public void rename_partition(String dbName, String tblName, List<String> list,
+ Partition partition) throws InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .rename_partition(dbName, tblName, list, partition);
+ }
+ }
+
+ @Override
+ public RenamePartitionResponse rename_partition_req(
+ RenamePartitionRequest renamePartitionRequest)
+ throws InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .rename_partition_req(renamePartitionRequest);
+ }
+ }
+
+ @Override
+ public boolean partition_name_has_valid_characters(List<String> list, boolean b)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .partition_name_has_valid_characters(list, b);
+ }
+ }
+
+ @Override
+ public String get_config_value(String key, String defaultVal)
+ throws ConfigValSecurityException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_config_value(key, defaultVal);
+ }
+ }
+
+ @Override
+ public List<String> partition_name_to_vals(String name)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().partition_name_to_vals(name);
+ }
+ }
+
+ @Override
+ public Map<String, String> partition_name_to_spec(String name)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().partition_name_to_spec(name);
+ }
+ }
+
+ @Override
+ public void markPartitionForEvent(String s, String s1, Map<String, String> map,
+ PartitionEventType partitionEventType) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .markPartitionForEvent(s, s1, map, partitionEventType);
+ }
+ }
+
+ @Override
+ public boolean isPartitionMarkedForEvent(String s, String s1, Map<String, String> map,
+ PartitionEventType partitionEventType) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().isPartitionMarkedForEvent(s, s1,
+ map, partitionEventType);
+ }
+ }
+
+ @Override
+ public PrimaryKeysResponse get_primary_keys(PrimaryKeysRequest primaryKeysRequest)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_primary_keys(primaryKeysRequest);
+ }
+ }
+
+ @Override
+ public ForeignKeysResponse get_foreign_keys(ForeignKeysRequest foreignKeysRequest)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_foreign_keys(foreignKeysRequest);
+ }
+ }
+
+ @Override
+ public UniqueConstraintsResponse get_unique_constraints(
+ UniqueConstraintsRequest uniqueConstraintsRequest)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_unique_constraints(uniqueConstraintsRequest);
+ }
+ }
+
+ @Override
+ public NotNullConstraintsResponse get_not_null_constraints(
+ NotNullConstraintsRequest notNullConstraintsRequest)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_not_null_constraints(notNullConstraintsRequest);
+ }
+ }
+
+ @Override
+ public DefaultConstraintsResponse get_default_constraints(
+ DefaultConstraintsRequest defaultConstraintsRequest)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_default_constraints(defaultConstraintsRequest);
+ }
+ }
+
+ @Override
+ public CheckConstraintsResponse get_check_constraints(
+ CheckConstraintsRequest checkConstraintsRequest)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_check_constraints(checkConstraintsRequest);
+ }
+ }
+
+ @Override
+ public boolean update_table_column_statistics(ColumnStatistics columnStatistics)
+ throws NoSuchObjectException, InvalidObjectException, MetaException,
+ InvalidInputException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .update_table_column_statistics(columnStatistics);
+ }
+ }
+
+ @Override
+ public boolean update_partition_column_statistics(ColumnStatistics columnStatistics)
+ throws NoSuchObjectException, InvalidObjectException, MetaException,
+ InvalidInputException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .update_partition_column_statistics(columnStatistics);
+ }
+ }
+
+ @Override
+ public SetPartitionsStatsResponse update_table_column_statistics_req(
+ SetPartitionsStatsRequest setPartitionsStatsRequest)
+ throws NoSuchObjectException, InvalidObjectException, MetaException,
+ InvalidInputException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .update_table_column_statistics_req(setPartitionsStatsRequest);
+ }
+ }
+
+ @Override
+ public SetPartitionsStatsResponse update_partition_column_statistics_req(
+ SetPartitionsStatsRequest setPartitionsStatsRequest)
+ throws NoSuchObjectException, InvalidObjectException, MetaException,
+ InvalidInputException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .update_partition_column_statistics_req(setPartitionsStatsRequest);
+ }
+ }
+
+ @Override
+ public ColumnStatistics get_table_column_statistics(String s, String s1, String s2)
+ throws NoSuchObjectException, MetaException, InvalidInputException,
+ InvalidObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_table_column_statistics(s, s1
+ , s2);
+ }
+ }
+
+ @Override
+ public ColumnStatistics get_partition_column_statistics(String s, String s1, String s2,
+ String s3)
+ throws NoSuchObjectException, MetaException, InvalidInputException,
+ InvalidObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_partition_column_statistics(s
+ , s1, s2, s3);
+ }
+ }
+
+ @Override
+ public TableStatsResult get_table_statistics_req(TableStatsRequest tableStatsRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_table_statistics_req(tableStatsRequest);
+ }
+ }
+
+ @Override
+ public PartitionsStatsResult get_partitions_statistics_req(
+ PartitionsStatsRequest partitionsStatsRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_partitions_statistics_req(partitionsStatsRequest);
+ }
+ }
+
+ @Override
+ public AggrStats get_aggr_stats_for(PartitionsStatsRequest partitionsStatsRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_aggr_stats_for(partitionsStatsRequest);
+ }
+ }
+
+ @Override
+ public boolean set_aggr_stats_for(SetPartitionsStatsRequest setPartitionsStatsRequest)
+ throws NoSuchObjectException, InvalidObjectException, MetaException,
+ InvalidInputException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .set_aggr_stats_for(setPartitionsStatsRequest);
+ }
+ }
+
+ @Override
+ public boolean delete_partition_column_statistics(String dbName, String tblName,
+ String partName,
+ String colName, String engine)
+ throws NoSuchObjectException, MetaException, InvalidObjectException,
+ InvalidInputException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .delete_partition_column_statistics(dbName, tblName
+ , partName, colName, engine);
+ }
+ }
+
+ @Override
+ public boolean delete_table_column_statistics(String dbName, String tblName,
+ String columnName, String engien)
+ throws NoSuchObjectException, MetaException, InvalidObjectException,
+ InvalidInputException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .delete_table_column_statistics(dbName,
+ tblName, columnName, engien);
+ }
+ }
+
+ @Override
+ public void create_function(Function function)
+ throws AlreadyExistsException, InvalidObjectException, MetaException,
+ NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().create_function(function);
+ }
+ }
+
+ @Override
+ public void drop_function(String dbName, String funcName)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().drop_function(dbName, funcName);
+ }
+ }
+
+ @Override
+ public void alter_function(String s, String s1, Function function)
+ throws InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().alter_function(s, s1, function);
+ }
+ }
+
+ @Override
+ public List<String> get_functions(String s, String s1)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_functions(s, s1);
+ }
+ }
+
+ @Override
+ public Function get_function(String s, String s1)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_function(s, s1);
+ }
+ }
+
+ @Override
+ public GetAllFunctionsResponse get_all_functions() throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_all_functions();
+ }
+ }
+
+ @Override
+ public boolean create_role(Role role) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().create_role(role);
+ }
+ }
+
+ @Override
+ public boolean drop_role(String s) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().drop_role(s);
+ }
+ }
+
+ @Override
+ public List<String> get_role_names() throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_role_names();
+ }
+ }
+
+ @Override
+ public boolean grant_role(String roleName, String userName, PrincipalType principalType,
+ String grantor,
+ PrincipalType grantorType, boolean grantOption) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .grant_role(roleName, userName, principalType,
+ grantor, grantorType, grantOption);
+ }
+ }
+
+ @Override
+ public boolean revoke_role(String s, String s1, PrincipalType principalType)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().revoke_role(s, s1, principalType);
+ }
+ }
+
+ @Override
+ public List<Role> list_roles(String s, PrincipalType principalType)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().list_roles(s, principalType);
+ }
+ }
+
+ @Override
+ public GrantRevokeRoleResponse grant_revoke_role(
+ GrantRevokeRoleRequest grantRevokeRoleRequest) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .grant_revoke_role(grantRevokeRoleRequest);
+ }
+ }
+
+ @Override
+ public GetPrincipalsInRoleResponse get_principals_in_role(
+ GetPrincipalsInRoleRequest getPrincipalsInRoleRequest)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_principals_in_role(getPrincipalsInRoleRequest);
+ }
+ }
+
+ @Override
+ public GetRoleGrantsForPrincipalResponse get_role_grants_for_principal(
+ GetRoleGrantsForPrincipalRequest getRoleGrantsForPrincipalRequest)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_role_grants_for_principal(getRoleGrantsForPrincipalRequest);
+ }
+ }
+
+ @Override
+ public PrincipalPrivilegeSet get_privilege_set(HiveObjectRef hiveObjectRef, String s,
+ List<String> list) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_privilege_set(hiveObjectRef,
+ s, list);
+ }
+ }
+
+ @Override
+ public List<HiveObjectPrivilege> list_privileges(String s, PrincipalType principalType,
+ HiveObjectRef hiveObjectRef) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().list_privileges(s, principalType,
+ hiveObjectRef);
+ }
+ }
+
+ @Override
+ public boolean grant_privileges(PrivilegeBag privilegeBag)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().grant_privileges(privilegeBag);
+ }
+ }
+
+ @Override
+ public boolean revoke_privileges(PrivilegeBag privilegeBag)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().revoke_privileges(privilegeBag);
+ }
+ }
+
+ @Override
+ public GrantRevokePrivilegeResponse grant_revoke_privileges(
+ GrantRevokePrivilegeRequest grantRevokePrivilegeRequest)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .grant_revoke_privileges(grantRevokePrivilegeRequest);
+ }
+ }
+
+ @Override
+ public GrantRevokePrivilegeResponse refresh_privileges(HiveObjectRef hiveObjectRef,
+ String s, GrantRevokePrivilegeRequest grantRevokePrivilegeRequest)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().refresh_privileges(hiveObjectRef, s,
+ grantRevokePrivilegeRequest);
+ }
+ }
+
+ @Override
+ public List<String> set_ugi(String s, List<String> list)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().set_ugi(s, list);
+ }
+ }
+
+ @Override
+ public String get_delegation_token(String s, String s1)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_delegation_token(s, s1);
+ }
+ }
+
+ @Override
+ public long renew_delegation_token(String s) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().renew_delegation_token(s);
+ }
+ }
+
+ @Override
+ public void cancel_delegation_token(String s) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().cancel_delegation_token(s);
+ }
+ }
+
+ @Override
+ public boolean add_token(String tokenIdentifier, String delegationToken)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .add_token(tokenIdentifier, delegationToken);
+ }
+ }
+
+ @Override
+ public boolean remove_token(String tokenIdentifier) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().remove_token(tokenIdentifier);
+ }
+ }
+
+ @Override
+ public String get_token(String s) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_token(s);
+ }
+ }
+
+ @Override
+ public List<String> get_all_token_identifiers() throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_all_token_identifiers();
+ }
+ }
+
+ @Override
+ public int add_master_key(String s) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().add_master_key(s);
+ }
+ }
+
+ @Override
+ public void update_master_key(int i, String s)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().update_master_key(i, s);
+ }
+ }
+
+ @Override
+ public boolean remove_master_key(int i) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().remove_master_key(i);
+ }
+ }
+
+ @Override
+ public List<String> get_master_keys() throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_master_keys();
+ }
+ }
+
+ @Override
+ public GetOpenTxnsResponse get_open_txns() throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_open_txns();
+ }
+ }
+
+ @Override
+ public GetOpenTxnsInfoResponse get_open_txns_info() throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_open_txns_info();
+ }
+ }
+
+ @Override
+ public OpenTxnsResponse open_txns(OpenTxnRequest openTxnRequest) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().open_txns(openTxnRequest);
+ }
+ }
+
+ @Override
+ public void abort_txn(AbortTxnRequest abortTxnRequest)
+ throws NoSuchTxnException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().abort_txn(abortTxnRequest);
+ }
+ }
+
+ @Override
+ public void abort_txns(AbortTxnsRequest abortTxnsRequest)
+ throws NoSuchTxnException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().abort_txns(abortTxnsRequest);
+ }
+ }
+
+ @Override
+ public void commit_txn(CommitTxnRequest commitTxnRequest)
+ throws NoSuchTxnException, TxnAbortedException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().commit_txn(commitTxnRequest);
+ }
+ }
+
+ @Override
+ public void repl_tbl_writeid_state(
+ ReplTblWriteIdStateRequest replTblWriteIdStateRequest) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .repl_tbl_writeid_state(replTblWriteIdStateRequest);
+ }
+ }
+
+ @Override
+ public GetValidWriteIdsResponse get_valid_write_ids(
+ GetValidWriteIdsRequest getValidWriteIdsRequest)
+ throws NoSuchTxnException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_valid_write_ids(getValidWriteIdsRequest);
+ }
+ }
+
+ @Override
+ public AllocateTableWriteIdsResponse allocate_table_write_ids(
+ AllocateTableWriteIdsRequest allocateTableWriteIdsRequest)
+ throws NoSuchTxnException, TxnAbortedException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .allocate_table_write_ids(allocateTableWriteIdsRequest);
+ }
+ }
+
+ @Override
+ public LockResponse lock(LockRequest lockRequest)
+ throws NoSuchTxnException, TxnAbortedException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().lock(lockRequest);
+ }
+ }
+
+ @Override
+ public LockResponse check_lock(CheckLockRequest checkLockRequest)
+ throws NoSuchTxnException, TxnAbortedException, NoSuchLockException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().check_lock(checkLockRequest);
+ }
+ }
+
+ @Override
+ public void unlock(UnlockRequest unlockRequest)
+ throws NoSuchLockException, TxnOpenException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().unlock(unlockRequest);
+ }
+ }
+
+ @Override
+ public ShowLocksResponse show_locks(ShowLocksRequest showLocksRequest)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().show_locks(showLocksRequest);
+ }
+ }
+
+ @Override
+ public void heartbeat(HeartbeatRequest heartbeatRequest)
+ throws NoSuchLockException, NoSuchTxnException, TxnAbortedException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().heartbeat(heartbeatRequest);
+ }
+ }
+
+ @Override
+ public HeartbeatTxnRangeResponse heartbeat_txn_range(
+ HeartbeatTxnRangeRequest heartbeatTxnRangeRequest) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .heartbeat_txn_range(heartbeatTxnRangeRequest);
+ }
+ }
+
+ @Override
+ public void compact(CompactionRequest compactionRequest) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().compact(compactionRequest);
+ }
+ }
+
+ @Override
+ public CompactionResponse compact2(CompactionRequest compactionRequest)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().compact2(compactionRequest);
+ }
+ }
+
+ @Override
+ public ShowCompactResponse show_compact(ShowCompactRequest showCompactRequest)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().show_compact(showCompactRequest);
+ }
+ }
+
+ @Override
+ public void add_dynamic_partitions(AddDynamicPartitions addDynamicPartitions)
+ throws NoSuchTxnException, TxnAbortedException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .add_dynamic_partitions(addDynamicPartitions);
+ }
+ }
+
+ @Override
+ public OptionalCompactionInfoStruct find_next_compact(String s)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().find_next_compact(s);
+ }
+ }
+
+ @Override
+ public void update_compactor_state(CompactionInfoStruct compactionInfoStruct, long l)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .update_compactor_state(compactionInfoStruct, l);
+ }
+ }
+
+ @Override
+ public List<String> find_columns_with_stats(CompactionInfoStruct compactionInfoStruct)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .find_columns_with_stats(compactionInfoStruct);
+ }
+ }
+
+ @Override
+ public void mark_cleaned(CompactionInfoStruct compactionInfoStruct)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().mark_cleaned(compactionInfoStruct);
+ }
+ }
+
+ @Override
+ public void mark_compacted(CompactionInfoStruct compactionInfoStruct)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().markCompacted(compactionInfoStruct);
+ }
+ }
+
+ @Override
+ public void mark_failed(CompactionInfoStruct compactionInfoStruct)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().mark_failed(compactionInfoStruct);
+ }
+ }
+
+ @Override
+ public MaxAllocatedTableWriteIdResponse get_max_allocated_table_write_id(
+ MaxAllocatedTableWriteIdRequest rqst)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_max_allocated_table_write_id(rqst);
+ }
+ }
+
+ @Override
+ public void seed_write_id(SeedTableWriteIdsRequest rqst)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().seed_write_id(rqst);
+ }
+ }
+
+ @Override
+ public void seed_txn_id(SeedTxnIdRequest rqst)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().seed_txn_id(rqst);
+ }
+ }
+
+ @Override
+ public void set_hadoop_jobid(String s, long l) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().set_hadoop_jobid(s, l);
+ }
+ }
+
+ @Override
+ public NotificationEventResponse get_next_notification(
+ NotificationEventRequest notificationEventRequest) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_next_notification(notificationEventRequest);
+ }
+ }
+
+ @Override
+ public CurrentNotificationEventId get_current_notificationEventId() throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_current_notificationEventId();
+ }
+ }
+
+ @Override
+ public NotificationEventsCountResponse get_notification_events_count(
+ NotificationEventsCountRequest notificationEventsCountRequest) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_notification_events_count(notificationEventsCountRequest);
+ }
+ }
+
+ @Override
+ public FireEventResponse fire_listener_event(FireEventRequest fireEventRequest)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .fire_listener_event(fireEventRequest);
+ }
+ }
+
+ @Override
+ public void flushCache() throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().flushCache();
+ }
+ }
+
+ @Override
+ public WriteNotificationLogResponse add_write_notification_log(
+ WriteNotificationLogRequest writeNotificationLogRequest) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .add_write_notification_log(writeNotificationLogRequest);
+ }
+ }
+
+ @Override
+ public CmRecycleResponse cm_recycle(CmRecycleRequest cmRecycleRequest)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().cm_recycle(cmRecycleRequest);
+ }
+ }
+
+ @Override
+ public GetFileMetadataByExprResult get_file_metadata_by_expr(
+ GetFileMetadataByExprRequest getFileMetadataByExprRequest) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_file_metadata_by_expr(getFileMetadataByExprRequest);
+ }
+ }
+
+ @Override
+ public GetFileMetadataResult get_file_metadata(
+ GetFileMetadataRequest getFileMetadataRequest) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_file_metadata(getFileMetadataRequest);
+ }
+ }
+
+ @Override
+ public PutFileMetadataResult put_file_metadata(
+ PutFileMetadataRequest putFileMetadataRequest) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .put_file_metadata(putFileMetadataRequest);
+ }
+ }
+
+ @Override
+ public ClearFileMetadataResult clear_file_metadata(
+ ClearFileMetadataRequest clearFileMetadataRequest) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .clear_file_metadata(clearFileMetadataRequest);
+ }
+ }
+
+ @Override
+ public CacheFileMetadataResult cache_file_metadata(
+ CacheFileMetadataRequest cacheFileMetadataRequest) throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .cache_file_metadata(cacheFileMetadataRequest);
+ }
+ }
+
+ @Override
+ public String get_metastore_db_uuid() throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_metastore_db_uuid();
+ }
+ }
+
+ @Override
+ public WMCreateResourcePlanResponse create_resource_plan(
+ WMCreateResourcePlanRequest wmCreateResourcePlanRequest)
+ throws AlreadyExistsException, InvalidObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .create_resource_plan(wmCreateResourcePlanRequest);
+ }
+ }
+
+ @Override
+ public WMGetResourcePlanResponse get_resource_plan(
+ WMGetResourcePlanRequest wmGetResourcePlanRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_resource_plan(wmGetResourcePlanRequest);
+ }
+ }
+
+ @Override
+ public WMGetActiveResourcePlanResponse get_active_resource_plan(
+ WMGetActiveResourcePlanRequest wmGetActiveResourcePlanRequest)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_active_resource_plan(wmGetActiveResourcePlanRequest);
+ }
+ }
+
+ @Override
+ public WMGetAllResourcePlanResponse get_all_resource_plans(
+ WMGetAllResourcePlanRequest wmGetAllResourcePlanRequest)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_all_resource_plans(wmGetAllResourcePlanRequest);
+ }
+ }
+
+ @Override
+ public WMAlterResourcePlanResponse alter_resource_plan(
+ WMAlterResourcePlanRequest wmAlterResourcePlanRequest)
+ throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .alter_resource_plan(wmAlterResourcePlanRequest);
+ }
+ }
+
+ @Override
+ public WMValidateResourcePlanResponse validate_resource_plan(
+ WMValidateResourcePlanRequest wmValidateResourcePlanRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .validate_resource_plan(wmValidateResourcePlanRequest);
+ }
+ }
+
+ @Override
+ public WMDropResourcePlanResponse drop_resource_plan(
+ WMDropResourcePlanRequest wmDropResourcePlanRequest)
+ throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .drop_resource_plan(wmDropResourcePlanRequest);
+ }
+ }
+
+ @Override
+ public WMCreateTriggerResponse create_wm_trigger(
+ WMCreateTriggerRequest wmCreateTriggerRequest)
+ throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+ MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .create_wm_trigger(wmCreateTriggerRequest);
+ }
+ }
+
+ @Override
+ public WMAlterTriggerResponse alter_wm_trigger(
+ WMAlterTriggerRequest wmAlterTriggerRequest)
+ throws NoSuchObjectException, InvalidObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .alter_wm_trigger(wmAlterTriggerRequest);
+ }
+ }
+
+ @Override
+ public WMDropTriggerResponse drop_wm_trigger(WMDropTriggerRequest wmDropTriggerRequest)
+ throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .drop_wm_trigger(wmDropTriggerRequest);
+ }
+ }
+
+ @Override
+ public WMGetTriggersForResourePlanResponse get_triggers_for_resourceplan(
+ WMGetTriggersForResourePlanRequest wmGetTriggersForResourePlanRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_triggers_for_resourceplan(wmGetTriggersForResourePlanRequest);
+ }
+ }
+
+ @Override
+ public WMCreatePoolResponse create_wm_pool(WMCreatePoolRequest wmCreatePoolRequest)
+ throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+ MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .create_wm_pool(wmCreatePoolRequest);
+ }
+ }
+
+ @Override
+ public WMAlterPoolResponse alter_wm_pool(WMAlterPoolRequest wmAlterPoolRequest)
+ throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+ MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().alter_wm_pool(wmAlterPoolRequest);
+ }
+ }
+
+ @Override
+ public WMDropPoolResponse drop_wm_pool(WMDropPoolRequest wmDropPoolRequest)
+ throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().drop_wm_pool(wmDropPoolRequest);
+ }
+ }
+
+ @Override
+ public WMCreateOrUpdateMappingResponse create_or_update_wm_mapping(
+ WMCreateOrUpdateMappingRequest wmCreateOrUpdateMappingRequest)
+ throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+ MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .create_or_update_wm_mapping(wmCreateOrUpdateMappingRequest);
+ }
+ }
+
+ @Override
+ public WMDropMappingResponse drop_wm_mapping(WMDropMappingRequest wmDropMappingRequest)
+ throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .drop_wm_mapping(wmDropMappingRequest);
+ }
+ }
+
+ @Override
+ public WMCreateOrDropTriggerToPoolMappingResponse
+ create_or_drop_wm_trigger_to_pool_mapping(
+ WMCreateOrDropTriggerToPoolMappingRequest wmCreateOrDropTriggerToPoolMappingRequest)
+ throws AlreadyExistsException, NoSuchObjectException, InvalidObjectException,
+ MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .create_or_drop_wm_trigger_to_pool_mapping(
+ wmCreateOrDropTriggerToPoolMappingRequest);
+ }
+ }
+
+ @Override
+ public void create_ischema(ISchema iSchema)
+ throws AlreadyExistsException, NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().create_ischema(iSchema);
+ }
+ }
+
+ @Override
+ public void alter_ischema(AlterISchemaRequest alterISchemaRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().alter_ischema(alterISchemaRequest);
+ }
+ }
+
+ @Override
+ public ISchema get_ischema(ISchemaName iSchemaName)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_ischema(iSchemaName);
+ }
+ }
+
+ @Override
+ public void drop_ischema(ISchemaName iSchemaName)
+ throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().drop_ischema(iSchemaName);
+ }
+ }
+
+ @Override
+ public void add_schema_version(SchemaVersion schemaVersion)
+ throws AlreadyExistsException, NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().add_schema_version(schemaVersion);
+ }
+ }
+
+ @Override
+ public SchemaVersion get_schema_version(SchemaVersionDescriptor schemaVersionDescriptor)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_schema_version(schemaVersionDescriptor);
+ }
+ }
+
+ @Override
+ public SchemaVersion get_schema_latest_version(ISchemaName iSchemaName)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_schema_latest_version(iSchemaName);
+ }
+ }
+
+ @Override
+ public List<SchemaVersion> get_schema_all_versions(ISchemaName iSchemaName)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_schema_all_versions(iSchemaName);
+ }
+ }
+
+ @Override
+ public void drop_schema_version(SchemaVersionDescriptor schemaVersionDescriptor)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .drop_schema_version(schemaVersionDescriptor);
+ }
+ }
+
+ @Override
+ public FindSchemasByColsResp get_schemas_by_cols(
+ FindSchemasByColsRqst findSchemasByColsRqst) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_schemas_by_cols(findSchemasByColsRqst);
+ }
+ }
+
+ @Override
+ public void map_schema_version_to_serde(
+ MapSchemaVersionToSerdeRequest mapSchemaVersionToSerdeRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .map_schema_version_to_serde(mapSchemaVersionToSerdeRequest);
+ }
+ }
+
+ @Override
+ public void set_schema_version_state(
+ SetSchemaVersionStateRequest setSchemaVersionStateRequest)
+ throws NoSuchObjectException, InvalidOperationException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .set_schema_version_state(setSchemaVersionStateRequest);
+ }
+ }
+
+ @Override
+ public void add_serde(SerDeInfo serDeInfo)
+ throws AlreadyExistsException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().add_serde(serDeInfo);
+ }
+ }
+
+ @Override
+ public SerDeInfo get_serde(GetSerdeRequest getSerdeRequest)
+ throws NoSuchObjectException, MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_serde(getSerdeRequest);
+ }
+ }
+
+ @Override
+ public LockResponse get_lock_materialization_rebuild(String s, String s1, long l)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_lock_materialization_rebuild(s, s1, l);
+ }
+ }
+
+ @Override
+ public boolean heartbeat_lock_materialization_rebuild(String s, String s1, long l)
+ throws TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .heartbeat_lock_materialization_rebuild(s, s1, l);
+ }
+ }
+
+ @Override
+ public void add_runtime_stats(RuntimeStat runtimeStat)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient().add_runtime_stats(runtimeStat);
+ }
+ }
+
+ @Override
+ public List<RuntimeStat> get_runtime_stats(
+ GetRuntimeStatsRequest getRuntimeStatsRequest) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_runtime_stats(getRuntimeStatsRequest);
+ }
+ }
+
+ @Override
+ public ScheduledQueryPollResponse scheduled_query_poll(
+ ScheduledQueryPollRequest scheduledQueryPollRequest)
+ throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .scheduled_query_poll(scheduledQueryPollRequest);
+ }
+ }
+
+ @Override
+ public void scheduled_query_maintenance(
+ ScheduledQueryMaintenanceRequest scheduledQueryMaintenanceRequest)
+ throws MetaException, NoSuchObjectException, AlreadyExistsException,
+ InvalidInputException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .scheduled_query_maintenance(scheduledQueryMaintenanceRequest);
+ }
+ }
+
+ @Override
+ public void scheduled_query_progress(
+ ScheduledQueryProgressInfo scheduledQueryProgressInfo)
+ throws MetaException, InvalidOperationException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .scheduled_query_progress(scheduledQueryProgressInfo);
+ }
+ }
+
+ @Override
+ public ScheduledQuery get_scheduled_query(ScheduledQueryKey scheduledQueryKey)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_scheduled_query(scheduledQueryKey);
+ }
+ }
+
+ @Override
+ public void add_replication_metrics(ReplicationMetricList replicationMetricList)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ client.getHiveClient().getThriftClient()
+ .add_replication_metrics(replicationMetricList);
+ }
+ }
+
+ @Override
+ public ReplicationMetricList get_replication_metrics(
+ GetReplicationMetricsRequest getReplicationMetricsRequest)
+ throws MetaException, NoSuchObjectException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient()
+ .get_replication_metrics(getReplicationMetricsRequest);
+ }
+ }
+
+ @Override
+ public long get_latest_txnid_in_conflict(long txnId) throws MetaException, TException {
+ try (MetaStoreClient client = catalog_.getMetaStoreClient()) {
+ return client.getHiveClient().getThriftClient().get_latest_txnid_in_conflict(txnId);
+ }
+ }
+
+ @Override
+ public String getName() throws TException {
+ throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+ "getName"));
+ }
+
+ @Override
+ public String getVersion() throws TException {
+ throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+ "getVersion"));
+ }
+
+ @Override
+ public fb_status getStatus() throws TException {
+ throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+ "getStatus"));
+ }
+
+ @Override
+ public String getStatusDetails() throws TException {
+ throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+ "getStatusDetails"));
+ }
+
+ @Override
+ public Map<String, Long> getCounters() throws TException {
+ throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+ "getCounters"));
+ }
+
+ @Override
+ public long getCounter(String s) throws TException {
+ throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+ "getCounter"));
+ }
+
+ @Override
+ public void setOption(String s, String s1) throws TException {
+ throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+ "setOption"));
+
+ }
+
+ @Override
+ public String getOption(String s) throws TException {
+ throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+ "getOption"));
+ }
+
+ @Override
+ public Map<String, String> getOptions() throws TException {
+ throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+ "getOptions"));
+ }
+
+ @Override
+ public String getCpuProfile(int i) throws TException {
+ throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+ "getCpuProfile"));
+ }
+
+ @Override
+ public long aliveSince() throws TException {
+ throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+ "aliveSince"));
+ }
+
+ @Override
+ public void reinitialize() throws TException {
+ throw new UnsupportedOperationException(String.format(NOT_IMPLEMENTED_UNSUPPORTED,
+ "reinitialize"));
+
+ }
+
+ @Override
+ public void shutdown() throws TException {
+ // nothing to do. Use this call to clean-up any session specific clean-up.
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
new file mode 100644
index 0000000..399ed09
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import org.apache.impala.catalog.CatalogException;
+
+/**
+ * NoOpCatalogMetastoreServer which is used when metastore service is not configured.
+ */
+public class NoOpCatalogMetastoreServer implements ICatalogMetastoreServer {
+ public static final NoOpCatalogMetastoreServer INSTANCE =
+ new NoOpCatalogMetastoreServer();
+ private static final String EMPTY = "";
+
+ @Override
+ public void start() throws CatalogException {
+ // no-op
+ }
+
+ @Override
+ public void stop() throws CatalogException {
+ // no-op
+ }
+
+ @Override
+ public String getMetrics() {
+ return EMPTY;
+ }
+}
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 4f68669..7ae4dc7 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.impala.analysis.SqlScanner;
import org.apache.impala.thrift.TBackendGflags;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@@ -288,4 +289,25 @@ public class BackendConfig {
public int getMaxWaitTimeForSyncDdlSecs() {
return backendCfg_.max_wait_time_for_sync_ddl_s;
}
+
+ public boolean startHmsServer() {
+ return backendCfg_.start_hms_server;
+ }
+
+ public int getHMSPort() {
+ return backendCfg_.hms_port;
+ }
+
+ public boolean fallbackToHMSOnErrors() {
+ return backendCfg_.fallback_to_hms_on_errors;
+ }
+
+ @VisibleForTesting
+ public void setEnableCatalogdHMSCache(boolean flag) {
+ backendCfg_.enable_catalogd_hms_cache = flag;
+ }
+
+ public boolean enableCatalogdHMSCache() {
+ return backendCfg_.enable_catalogd_hms_cache;
+ }
}
diff --git a/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsFileMetadataTest.java b/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsFileMetadataTest.java
new file mode 100644
index 0000000..158b359
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/metastore/CatalogHmsFileMetadataTest.java
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.net.Inet4Address;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.HdfsPartition.FileBlock;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.fb.FbFileBlock;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.CatalogServiceTestCatalog.CatalogServiceTestHMSCatalog;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class CatalogHmsFileMetadataTest {
+ private static CatalogServiceCatalog catalog_;
+ private static HiveMetaStoreClient catalogHmsClient_;
+ private static final Configuration CONF = MetastoreConf.newMetastoreConf();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ catalog_ = CatalogServiceTestCatalog.createTestCatalogMetastoreServer();
+ MetastoreConf.setVar(CONF, ConfVars.THRIFT_URIS,
+ "thrift://localhost:" + ((CatalogServiceTestHMSCatalog) catalog_).getPort());
+ // metastore clients which connect to catalogd's HMS endpoint need this
+ // configuration set since the forwarded HMS call use catalogd's HMS client
+ // not the end-user's UGI.
+ CONF.set("hive.metastore.execute.setugi", "false");
+ catalogHmsClient_ = new HiveMetaStoreClient(CONF);
+ }
+
+ @AfterClass
+ public static void cleanUp() throws Exception {
+ catalog_.close();
+ }
+
+ /**
+ * The test fetches partitions of a table over HMS API and then compares if the
+ * deserialized filemetadata from the response matches with what we have in catalogd.
+ */
+ @Test
+ public void testFileMetadataForPartitions() throws Exception {
+ // get partitions from catalog directly
+ HdfsTable tbl = (HdfsTable) catalog_
+ .getOrLoadTable("functional", "alltypes", "test", null);
+ HdfsPartition hdfsPartition1 = tbl
+ .getPartitionsForNames(Arrays.asList("year=2009/month=1")).get(0);
+ HdfsPartition hdfsPartition2 = tbl
+ .getPartitionsForNames(Arrays.asList("year=2009/month=2")).get(0);
+
+ // test empty partitions result case.
+ GetPartitionsByNamesRequest request = new GetPartitionsByNamesRequest();
+ String dbName = MetaStoreUtils.prependCatalogToDbName("functional", CONF);
+ request.setDb_name(dbName);
+ request.setTbl_name("alltypes");
+ // no names are set so the result is expected to be empty
+ request.setNames(new ArrayList<>());
+ request.setGetFileMetadata(true);
+ GetPartitionsByNamesResult result = catalogHmsClient_.getPartitionsByNames(request);
+ assertTrue(result.getPartitions().isEmpty());
+ Map<Partition, List<FileDescriptor>> fds = CatalogHmsClientUtils
+ .extractFileDescriptors(result, tbl.getHostIndex());
+ assertTrue(fds.isEmpty());
+
+ // get the partitions over HMS API.
+ request = new GetPartitionsByNamesRequest();
+ dbName = MetaStoreUtils.prependCatalogToDbName("functional", CONF);
+ request.setDb_name(dbName);
+ request.setTbl_name("alltypes");
+ request.setNames(Arrays.asList("year=2009/month=1", "year=2009/month=2"));
+ request.setGetFileMetadata(true);
+ result = catalogHmsClient_.getPartitionsByNames(request);
+ for (Partition part : result.getPartitions()) {
+ assertNotNull(part.getFileMetadata());
+ }
+ assertNotNull(result.getDictionary());
+ fds = CatalogHmsClientUtils
+ .extractFileDescriptors(result, tbl.getHostIndex());
+ assertEquals(2, fds.size());
+ for (List<FileDescriptor> partFds : fds.values()) {
+ assertFalse(partFds.isEmpty());
+ assertEquals(1, partFds.size());
+ }
+ // make sure that the FileDescriptors from catalog and over HMS API are the same
+ // for the same hostIndex
+ assertFdsAreSame(hdfsPartition1.getFileDescriptors(),
+ fds.get(result.getPartitions().get(0)));
+ assertFdsAreSame(hdfsPartition2.getFileDescriptors(),
+ fds.get(result.getPartitions().get(1)));
+ }
+
+ public static void assertFdsAreSame(List<FileDescriptor> fdsFromCatalog,
+ List<FileDescriptor> fdsFromHMS) {
+ assertEquals(fdsFromCatalog.size(), fdsFromHMS.size());
+ for (int i=0; i<fdsFromCatalog.size(); i++) {
+ FileDescriptor fdFromCatalog = fdsFromCatalog.get(i);
+ FileDescriptor fdFromHMS = fdsFromHMS.get(i);
+ assertEquals(fdFromCatalog.getRelativePath(), fdFromHMS.getRelativePath());
+ assertEquals(fdFromCatalog.getFileCompression(), fdFromHMS.getFileCompression());
+ assertEquals(fdFromCatalog.getFileLength(), fdFromHMS.getFileLength());
+ assertEquals(fdFromCatalog.getIsEc(), fdFromHMS.getIsEc());
+ assertEquals(fdFromCatalog.getModificationTime(), fdFromHMS.getModificationTime());
+ assertEquals(fdFromCatalog.getNumFileBlocks(), fdFromHMS.getNumFileBlocks());
+ for (int j=0; j<fdFromCatalog.getNumFileBlocks(); j++) {
+ FbFileBlock blockFromCat = fdFromCatalog.getFbFileBlock(j);
+ FbFileBlock blockFromHMS = fdFromCatalog.getFbFileBlock(j);
+ // quick and dirty way to compare the relevant fields within the file blocks.
+ assertEquals(FileBlock.debugString(blockFromCat),
+ FileBlock.debugString(blockFromHMS));
+ }
+ }
+ }
+
+ /**
+ * Test requests a table over HMS API with file-metadata and then compares if the
+ * file-metadata returned is same as what we have in catalogd.
+ */
+ @Test
+ public void testFileMetadataForTable() throws Exception {
+ Table tbl = catalogHmsClient_
+ .getTable(null, "functional", "zipcode_incomes", null, false, null, true);
+ assertNotNull(tbl.getFileMetadata());
+ HdfsTable catTbl = (HdfsTable) catalog_
+ .getOrLoadTable("functional", "zipcode_incomes", "test", null);
+ HdfsPartition part = (HdfsPartition) Iterables.getOnlyElement(catTbl.getPartitions());
+ List<FileDescriptor> hmsTblFds = CatalogHmsClientUtils
+ .extractFileDescriptors(tbl, catTbl.getHostIndex());
+ assertFdsAreSame(part.getFileDescriptors(), hmsTblFds);
+ }
+}
diff --git a/fe/src/test/java/org/apache/impala/catalog/metastore/EnableCatalogdHmsCacheFlagTest.java b/fe/src/test/java/org/apache/impala/catalog/metastore/EnableCatalogdHmsCacheFlagTest.java
new file mode 100644
index 0000000..9409ddb
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/catalog/metastore/EnableCatalogdHmsCacheFlagTest.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog.metastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
+import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesResult;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsPartition;
+import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.testutil.CatalogServiceTestCatalog;
+import org.apache.impala.testutil.CatalogServiceTestCatalog.CatalogServiceTestHMSCatalog;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class EnableCatalogdHmsCacheFlagTest {
+ private static CatalogServiceCatalog catalog_;
+ private static HiveMetaStoreClient catalogHmsClient_;
+ private static final Configuration CONF = MetastoreConf.newMetastoreConf();
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ catalog_ = CatalogServiceTestCatalog.createTestCatalogMetastoreServer();
+ MetastoreConf.setVar(CONF, ConfVars.THRIFT_URIS,
+ "thrift://localhost:" + ((CatalogServiceTestHMSCatalog) catalog_).getPort());
+ // metastore clients which connect to catalogd's HMS endpoint need this
+ // configuration set since the forwarded HMS call use catalogd's HMS client
+ // not the end-user's UGI.
+ CONF.set("hive.metastore.execute.setugi", "false");
+ catalogHmsClient_ = new HiveMetaStoreClient(CONF);
+ }
+
+ @AfterClass
+ public static void cleanUp() throws Exception {
+ catalog_.close();
+ }
+
+ /**
+ * The test fetches partitions of a table over HMS API and then compares if the
+ * deserialized filemetadata from the response matches with what we have in catalogd.
+ */
+ @Test
+ public void testEnableCatalogdCachingFlag() throws Exception {
+
+ BackendConfig.INSTANCE.setEnableCatalogdHMSCache(true);
+
+ // get partitions from catalog directly
+ HdfsTable tbl = (HdfsTable) catalog_
+ .getOrLoadTable("functional", "alltypes", "test", null);
+ HdfsPartition hdfsPartition1 = tbl
+ .getPartitionsForNames(Arrays.asList("year=2009/month=1")).get(0);
+ HdfsPartition hdfsPartition2 = tbl
+ .getPartitionsForNames(Arrays.asList("year=2009/month=2")).get(0);
+
+ BackendConfig.INSTANCE.setEnableCatalogdHMSCache(false);
+
+ // test empty partitions result case.
+ GetPartitionsByNamesRequest request = new GetPartitionsByNamesRequest();
+ String dbName = MetaStoreUtils.prependCatalogToDbName("functional", CONF);
+ request.setDb_name(dbName);
+ request.setTbl_name("alltypes");
+ // no names are set so the result is expected to be empty
+ request.setNames(new ArrayList<>());
+ request.setGetFileMetadata(true);
+ GetPartitionsByNamesResult result = catalogHmsClient_.getPartitionsByNames(request);
+ assertTrue(result.getPartitions().isEmpty());
+ Map<Partition, List<FileDescriptor>> fds = CatalogHmsClientUtils
+ .extractFileDescriptors(result, tbl.getHostIndex());
+ assertTrue(fds.isEmpty());
+
+ // get the partitions over HMS API.
+ request = new GetPartitionsByNamesRequest();
+ dbName = MetaStoreUtils.prependCatalogToDbName("functional", CONF);
+ request.setDb_name(dbName);
+ request.setTbl_name("alltypes");
+ request.setNames(Arrays.asList("year=2009/month=1", "year=2009/month=2"));
+ request.setGetFileMetadata(true);
+ result = catalogHmsClient_.getPartitionsByNames(request);
+ for (Partition part : result.getPartitions()) {
+ assertNotNull(part.getFileMetadata());
+ }
+ assertNotNull(result.getDictionary());
+ fds = CatalogHmsClientUtils
+ .extractFileDescriptors(result, tbl.getHostIndex());
+ assertEquals(2, fds.size());
+ for (List<FileDescriptor> partFds : fds.values()) {
+ assertFalse(partFds.isEmpty());
+ assertEquals(1, partFds.size());
+ }
+
+ // make sure that the FileDescriptors from catalog and over HMS API are the same
+ // for the same hostIndex
+ CatalogHmsFileMetadataTest.assertFdsAreSame(hdfsPartition1.getFileDescriptors(),
+ fds.get(result.getPartitions().get(0)));
+ CatalogHmsFileMetadataTest.assertFdsAreSame(hdfsPartition2.getFileDescriptors(),
+ fds.get(result.getPartitions().get(1)));
+ }
+}
\ No newline at end of file
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index 637a114..a942ee2 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -17,12 +17,15 @@
package org.apache.impala.testutil;
+import java.net.ServerSocket;
import org.apache.impala.authorization.AuthorizationFactory;
import org.apache.impala.authorization.NoopAuthorizationFactory;
import org.apache.impala.authorization.AuthorizationPolicy;
import org.apache.impala.authorization.NoopAuthorizationFactory.NoopAuthorizationManager;
+import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.MetaStoreClientPool;
+import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.common.ImpalaException;
import org.apache.impala.service.FeSupport;
@@ -54,19 +57,29 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
return createWithAuth(new NoopAuthorizationFactory());
}
+ public static CatalogServiceCatalog createWithAuth(AuthorizationFactory authzFactory) {
+ return createWithAuth(authzFactory, false);
+ }
+
/**
* Creates a catalog server that reads authorization policy metadata from the
* authorization config.
*/
- public static CatalogServiceCatalog createWithAuth(AuthorizationFactory authzFactory) {
+ public static CatalogServiceCatalog createWithAuth(AuthorizationFactory authzFactory,
+ boolean startCatalogHms) {
FeSupport.loadLibrary();
CatalogServiceCatalog cs;
try {
if (MetastoreShim.getMajorVersion() > 2) {
MetastoreShim.setHiveClientCapabilities();
}
- cs = new CatalogServiceTestCatalog(false, 16, new TUniqueId(),
- new MetaStoreClientPool(0, 0));
+ if (startCatalogHms) {
+ cs = new CatalogServiceTestHMSCatalog(false, 16, new TUniqueId(),
+ new MetaStoreClientPool(0, 0));
+ } else {
+ cs = new CatalogServiceTestCatalog(false, 16, new TUniqueId(),
+ new MetaStoreClientPool(0, 0));
+ }
cs.setAuthzManager(authzFactory.newAuthorizationManager(cs));
cs.reset();
} catch (ImpalaException e) {
@@ -95,6 +108,76 @@ public class CatalogServiceTestCatalog extends CatalogServiceCatalog {
return cs;
}
+ private static class CatalogTestMetastoreServer extends CatalogMetastoreServer {
+ private final int port;
+ public CatalogTestMetastoreServer(
+ CatalogServiceCatalog catalogServiceCatalog) throws ImpalaException {
+ super(catalogServiceCatalog);
+ try {
+ port = getRandomPort();
+ } catch (Exception e) {
+ throw new CatalogException(e.getMessage());
+ }
+ }
+
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ private static int getRandomPort() throws Exception {
+ for (int i=0; i<5; i++) {
+ ServerSocket serverSocket = null;
+ try {
+ serverSocket = new ServerSocket(0);
+ return serverSocket.getLocalPort();
+ } finally {
+ if (serverSocket != null) serverSocket.close();
+ }
+ }
+ throw new Exception("Could not find a free port");
+ }
+ }
+
+ public static class CatalogServiceTestHMSCatalog extends CatalogServiceTestCatalog {
+ private CatalogTestMetastoreServer metastoreServer;
+ public CatalogServiceTestHMSCatalog(boolean loadInBackground, int numLoadingThreads,
+ TUniqueId catalogServiceId,
+ MetaStoreClientPool metaStoreClientPool) throws ImpalaException {
+ super(loadInBackground, numLoadingThreads, catalogServiceId, metaStoreClientPool);
+ }
+
+ @Override
+ protected CatalogMetastoreServer getCatalogMetastoreServer() {
+ synchronized (this) {
+ if (metastoreServer != null) return metastoreServer;
+ try {
+ metastoreServer = new CatalogTestMetastoreServer(this);
+ } catch (ImpalaException e) {
+ return null;
+ }
+ return metastoreServer;
+ }
+ }
+
+ public int getPort() { return metastoreServer.getPort(); }
+
+ @Override
+ public void close() {
+ super.close();
+ try {
+ metastoreServer.stop();
+ } catch (CatalogException e) {
+ // ignored
+ }
+ }
+ }
+
+ public static CatalogServiceCatalog createTestCatalogMetastoreServer()
+ throws ImpalaException {
+ return createWithAuth(new NoopAuthorizationFactory(), true);
+ }
+
@Override
public AuthorizationPolicy getAuthPolicy() { return authPolicy_; }
}
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index 7978f91..6145b1d 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -30,6 +30,7 @@ import requests
import socket
import subprocess
import time
+import string
from functools import wraps
from getpass import getuser
from random import choice
@@ -151,6 +152,24 @@ class ImpalaTestSuite(BaseTestSuite):
# to work with the HS2 client can add HS2 in addition to or instead of beeswax.
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('protocol', 'beeswax'))
+ @staticmethod
+ def create_hive_client(port):
+ """
+ Creates a HMS client to a external running metastore service at the provided port
+ """
+ trans_type = 'buffered'
+ if pytest.config.option.use_kerberos:
+ trans_type = 'kerberos'
+ hive_transport = create_transport(
+ host=pytest.config.option.metastore_server.split(':')[0],
+ port=port,
+ service=pytest.config.option.hive_service_name,
+ transport_type=trans_type)
+ protocol = TBinaryProtocol.TBinaryProtocol(hive_transport)
+ hive_client = ThriftHiveMetastore.Client(protocol)
+ hive_transport.open()
+ return hive_client, hive_transport
+
@classmethod
def setup_class(cls):
"""Setup section that runs before each test suite"""
@@ -1185,3 +1204,11 @@ class ImpalaTestSuite(BaseTestSuite):
LOG.info("Expected log lines could not be found, sleeping before retrying: %s",
str(e))
time.sleep(1)
+
+ @staticmethod
+ def get_random_name(prefix='', length=5):
+ """
+ Gets a random name used to create unique database or table
+ """
+ assert length > 0
+ return prefix + ''.join(choice(string.ascii_lowercase) for i in range(length))
diff --git a/tests/custom_cluster/test_metastore_service.py b/tests/custom_cluster/test_metastore_service.py
new file mode 100644
index 0000000..c60fedf
--- /dev/null
+++ b/tests/custom_cluster/test_metastore_service.py
@@ -0,0 +1,707 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+from hive_metastore.ttypes import Database
+from hive_metastore.ttypes import FieldSchema
+from hive_metastore.ttypes import GetTableRequest
+from hive_metastore.ttypes import GetPartitionsByNamesRequest
+from hive_metastore.ttypes import Table
+from hive_metastore.ttypes import StorageDescriptor
+from hive_metastore.ttypes import SerDeInfo
+
+from tests.util.event_processor_utils import EventProcessorUtils
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+
+class TestMetastoreService(CustomClusterTestSuite):
+ """
+ Tests for the Catalog Metastore service. Each test in this class should
+ start a hms_server using the catalogd flag --start_hms_server=true
+ """
+
+ part_tbl = ImpalaTestSuite.get_random_name("test_metastore_part_tbl")
+ unpart_tbl = ImpalaTestSuite.get_random_name("test_metastore_unpart_tbl")
+ acid_part_tbl = ImpalaTestSuite.get_random_name("test_metastore_acid_part_tbl")
+ unpart_acid_tbl = ImpalaTestSuite.get_random_name("test_metastore_unpart_acid_tbl")
+ default_unknowntbl = ImpalaTestSuite.get_random_name("test_metastore_default_tbl")
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--use_local_catalog=true",
+ catalogd_args="--catalog_topic_mode=minimal "
+ "--start_hms_server=true "
+ "--hms_port=5899"
+ )
+ def test_passthrough_apis(self):
+ """
+ This test exercises some of the Catalog HMS APIs which are directly
+ passed through to the backing HMS service. This is by no means an
+ exhaustive set but merely used to as a sanity check to make sure that a
+ hive_client can connect to the Catalog's metastore service and is
+ able to execute calls to the backing HMS service.
+ """
+ catalog_hms_client = None
+ db_name = ImpalaTestSuite.get_random_name("test_passthrough_apis_db")
+ try:
+ catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+ assert catalog_hms_client is not None
+ # get_databases
+ databases = catalog_hms_client.get_all_databases()
+ assert databases is not None
+ assert len(databases) > 0
+ assert "functional" in databases
+ # get_database
+ database = catalog_hms_client.get_database("functional")
+ assert database is not None
+ assert "functional" == database.name
+ # get_tables
+ tables = catalog_hms_client.get_tables("functional", "*")
+ assert tables is not None
+ assert len(tables) > 0
+ assert "alltypes" in tables
+ # get table
+ table = catalog_hms_client.get_table("functional", "alltypes")
+ assert table is not None
+ assert "alltypes" == table.tableName
+ assert table.sd is not None
+ # get partitions
+ partitions = catalog_hms_client.get_partitions("functional", "alltypes", -1)
+ assert partitions is not None
+ assert len(partitions) > 0
+ # get partition names
+ part_names = catalog_hms_client.get_partition_names("functional", "alltypes",
+ -1)
+ assert part_names is not None
+ assert len(part_names) > 0
+ assert "year=2009/month=1" in part_names
+ # notification APIs
+ event_id = EventProcessorUtils.get_current_notification_id(catalog_hms_client)
+ assert event_id is not None
+ assert event_id > 0
+ # DDLs
+ catalog_hms_client.create_database(self.__get_test_database(db_name))
+ database = catalog_hms_client.get_database(db_name)
+ assert database is not None
+ assert db_name == database.name
+ tbl_name = ImpalaTestSuite.get_random_name(
+ "test_passthrough_apis_tbl")
+ cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
+ part_cols = [["part", "string", "part col"]]
+ catalog_hms_client.create_table(self.__get_test_tbl(db_name, tbl_name,
+ cols, part_cols))
+ table = catalog_hms_client.get_table(db_name, tbl_name)
+ assert table is not None
+ assert tbl_name == table.tableName
+ self.__compare_cols(cols, table.sd.cols)
+ self.__compare_cols(part_cols, table.partitionKeys)
+ finally:
+ if catalog_hms_client is not None:
+ catalog_hms_client.drop_database(db_name, True, True)
+ catalog_hms_client.shutdown()
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--use_local_catalog=true",
+ catalogd_args="--catalog_topic_mode=minimal "
+ "--start_hms_server=true "
+ "--hms_port=5899"
+ )
+ def test_get_table_req_with_fallback(self):
+ """
+ Test the get_table_req APIs with fallback to HMS enabled. These calls
+ succeed even if catalog throws exceptions since we fallback to HMS.
+ """
+ catalog_hms_client = None
+ db_name = ImpalaTestSuite.get_random_name(
+ "test_get_table_req_with_fallback_db")
+ try:
+ catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+ assert catalog_hms_client is not None
+
+ # Test simple get_table_req without stats.
+ get_table_request = GetTableRequest()
+ get_table_request.dbName = "functional"
+ get_table_request.tblName = "alltypes"
+ get_table_request.getFileMetadata = True
+ get_table_response = catalog_hms_client.get_table_req(get_table_request)
+ assert get_table_response.table.dbName == "functional"
+ assert get_table_response.table.tableName == "alltypes"
+ # Request did not ask for stats, verify colStats are not populated.
+ assert get_table_response.table.colStats is None
+ # assert that fileMetadata is in the response
+ self.__assert_filemd(get_table_response.table.fileMetadata,
+ get_table_response.table.dictionary)
+
+ # Test get_table_request with stats and engine set to Impala.
+ get_table_request.getColumnStats = True
+ get_table_request.engine = "impala"
+ get_table_request.getFileMetadata = False
+ get_table_response = catalog_hms_client.get_table_req(get_table_request)
+ assert get_table_response.table.dbName == "functional"
+ assert get_table_response.table.tableName == "alltypes"
+ assert get_table_response.table.colStats is not None
+ # Verify the column stats objects are populated for
+ # non-clustering columns.
+ assert len(get_table_response.table.colStats.statsObj) > 0
+ # file-metadata is not requested and hence should be not be set
+ # assert that fileMetadata is in the response
+ self.__assert_no_filemd(get_table_response.table.fileMetadata,
+ get_table_response.table.dictionary)
+
+ # Create table in Hive and test this is properly falling back to HMS.
+ # Create table via Hive.
+ catalog_hms_client.create_database(self.__get_test_database(db_name))
+ database = catalog_hms_client.get_database(db_name)
+ assert database is not None
+ assert db_name == database.name
+ tbl_name = ImpalaTestSuite.get_random_name(
+ "test_get_table_req_with_fallback_tbl")
+ cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
+ catalog_hms_client.create_table(self.__get_test_tbl(db_name, tbl_name, cols))
+
+ get_table_request.dbName = db_name
+ get_table_request.tblName = tbl_name
+ get_table_request.getColumnStats = True
+ get_table_request.getFileMetadata = True
+
+ # Engine is not specified, this should throw an exception even if
+ # fallback_to_hms_on_errors is true.
+ expected_exception = "isGetColumnStats() is true in the request but " \
+ "engine is not specified."
+ try:
+ catalog_hms_client.get_table_req(get_table_request)
+ except Exception as e:
+ if expected_exception is not None:
+ assert expected_exception in str(e)
+
+ # Set engine and request impala, this should fallback to HMS since
+ # the table is not in catalog cache. File metadata should be set even if the
+ # request is served via HMS fallback.
+ get_table_request.engine = "Impala"
+ get_table_response = catalog_hms_client.get_table_req(get_table_request)
+ assert get_table_response.table.dbName == db_name
+ assert get_table_response.table.tableName == tbl_name
+ assert get_table_response.table.fileMetadata is not None
+ # The table does not have any data so we expect the fileMetadata.data to be None
+ assert get_table_response.table.fileMetadata.data is None
+ # even if there are no files, the object dictionary should be not null and empty
+ assert get_table_response.table.dictionary is not None
+ assert len(get_table_response.table.dictionary.values) == 0
+ finally:
+ if catalog_hms_client is not None:
+ catalog_hms_client.shutdown()
+ if self.__get_database_no_throw(db_name) is not None:
+ self.hive_client.drop_database(db_name, True, True)
+
+ def __get_database_no_throw(self, db_name):
+ try:
+ return self.hive_client.get_database(db_name)
+ except Exception:
+ return None
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--use_local_catalog=true",
+ catalogd_args="--catalog_topic_mode=minimal "
+ "--start_hms_server=true "
+ "--hms_port=5899 "
+ "--fallback_to_hms_on_errors=false"
+ )
+ def test_get_table_req_without_fallback(self):
+ """
+ Test the get_table_req APIs with fallback to HMS enabled. These calls
+ throw exceptions since we do not fallback to HMS if the db/table is not
+ in Catalog cache.
+ """
+ catalog_hms_client = None
+ db_name = ImpalaTestSuite.get_random_name(
+ "test_get_table_req_without_fallback_db")
+ new_db_name = None
+ try:
+ catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+ assert catalog_hms_client is not None
+
+ # Create table via Hive.
+ self.hive_client.create_database(self.__get_test_database(db_name))
+ database = self.hive_client.get_database(db_name)
+ assert database is not None
+ assert db_name == database.name
+ tbl_name = ImpalaTestSuite.get_random_name("test_get_table_req_tbl")
+ cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
+ self.hive_client.create_table(self.__get_test_tbl(db_name, tbl_name, cols))
+
+ # Test get_table_request with stats and engine set to Impala.
+ # Test simple get_table_req without stats.
+ get_table_request = GetTableRequest()
+ get_table_request.dbName = db_name
+ get_table_request.tblName = tbl_name
+ get_table_request.getColumnStats = True
+ # Engine is not set, this should throw an exception without falling
+ # back to HMS.
+ expected_exception_str = "Column stats are requested " \
+ "but engine is not set in the request."
+ self.__call_get_table_req_expect_exception(catalog_hms_client,
+ get_table_request, expected_exception_str)
+ # Verify DB not found exception is thrown. Engine does not matter,
+ # we only return Impala table level column stats but this field is
+ # required to be consistent with Hive's implementation.
+ get_table_request.engine = "Impala"
+ expected_exception_str = "Database " + db_name + " not found"
+ self.__call_get_table_req_expect_exception(catalog_hms_client,
+ get_table_request, expected_exception_str)
+
+ # Create database in Impala
+ new_db_name = ImpalaTestSuite.get_random_name(
+ "test_get_table_req_without_fallback_db")
+ query = "create database " + new_db_name
+ self.execute_query_expect_success(self.client, query)
+ new_tbl_name = ImpalaTestSuite.get_random_name(
+ "test_get_table_req_without_fallback_tbl")
+ new_cols = [["c1", "int", "col 1"], ["c2", "string", "col 2"]]
+ # DDLs currently only pass-through to HMS so this call will not create a table
+ # in catalogd.
+ catalog_hms_client.create_table(self.__get_test_tbl(new_db_name, new_tbl_name,
+ new_cols))
+ # Verify table not found exception is thrown
+ get_table_request.dbName = new_db_name
+ get_table_request.tblName = new_tbl_name
+ expected_exception_str = "Table " + new_db_name + "." + new_tbl_name \
+ + " not found"
+ self.__call_get_table_req_expect_exception(catalog_hms_client,
+ get_table_request, expected_exception_str)
+
+ # Create a table in Impala and verify that the get_table_req gets the
+ # table and stats for non-clustering columns immediately.
+ impala_tbl_name = ImpalaTestSuite.get_random_name(
+ "test_get_table_req_without_fallback_impala_tbl")
+ query = "create table " + new_db_name + "." + impala_tbl_name + \
+ "(id int) partitioned by (year int)"
+ self.execute_query_expect_success(self.client, query)
+ get_table_request.dbName = new_db_name
+ get_table_request.tblName = impala_tbl_name
+ get_table_request.getColumnStats = True
+ # Engine does not matter, we only return Impala table level column
+ # stats but this field is required to be consistent with Hive's
+ # implementation.
+ get_table_request.engine = "impala"
+ get_table_response = catalog_hms_client.get_table_req(get_table_request)
+ assert get_table_response.table.dbName == new_db_name
+ assert get_table_response.table.tableName == impala_tbl_name
+ assert get_table_response.table.colStats is not None
+ assert get_table_response.table.colStats.statsObj is not None
+ # Verify there is a ColumnStatisticsObj only for non-clustering columns
+ # (id in this case).
+ assert len(get_table_response.table.colStats.statsObj) == 1
+ assert get_table_response.table.colStats.statsObj[0].colName == "id"
+
+ # Verify a non-default catalog in the request throws an exception.
+ get_table_request.catName = "testCatalog"
+ expected_exception_str = "Catalog service does not support " \
+ "non-default catalogs"
+ self.__call_get_table_req_expect_exception(catalog_hms_client,
+ get_table_request, expected_exception_str)
+ # fetch file-metadata of a non-partitioned table
+ get_table_request = GetTableRequest()
+ get_table_request.dbName = "functional"
+ get_table_request.tblName = "tinytable"
+ get_table_request.getFileMetadata = True
+ get_table_response = catalog_hms_client.get_table_req(get_table_request)
+ assert get_table_response.table.dbName == "functional"
+ assert get_table_response.table.tableName == "tinytable"
+ self.__assert_filemd(get_table_response.table.fileMetadata,
+ get_table_response.table.dictionary)
+
+ # fetch file-metadata along with stats
+ get_table_request = GetTableRequest()
+ get_table_request.dbName = "functional"
+ get_table_request.tblName = "tinytable"
+ get_table_request.getFileMetadata = True
+ get_table_request.getColumnStats = True
+ get_table_request.engine = "impala"
+ get_table_response = catalog_hms_client.get_table_req(get_table_request)
+ assert get_table_response.table.dbName == "functional"
+ assert get_table_response.table.tableName == "tinytable"
+ self.__assert_filemd(get_table_response.table.fileMetadata,
+ get_table_response.table.dictionary)
+ assert get_table_response.table.colStats is not None
+ assert get_table_response.table.colStats.statsObj is not None
+ finally:
+ if catalog_hms_client is not None:
+ catalog_hms_client.shutdown()
+ if self.__get_database_no_throw(db_name) is not None:
+ self.hive_client.drop_database(db_name, True, True)
+ if self.__get_database_no_throw(new_db_name) is not None:
+ self.hive_client.drop_database(new_db_name, True, True)
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--use_local_catalog=true",
+ catalogd_args="--catalog_topic_mode=minimal "
+ "--start_hms_server=true "
+ "--hms_port=5899 "
+ "--fallback_to_hms_on_errors=false"
+ )
+ def test_get_partitions_by_names(self):
+ catalog_hms_client = None
+ try:
+ catalog_hms_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+ assert catalog_hms_client is not None
+ valid_part_names = ["year=2009/month=1", "year=2009/month=2", "year=2009/month=3",
+ "year=2009/month=4"]
+ self.__run_partitions_by_names_tests(catalog_hms_client, "functional_parquet",
+ "alltypestiny", valid_part_names)
+ finally:
+ if catalog_hms_client is not None:
+ catalog_hms_client.shutdown()
+
+ @pytest.mark.execute_serially
+ @CustomClusterTestSuite.with_args(
+ impalad_args="--use_local_catalog=true",
+ catalogd_args="--catalog_topic_mode=minimal "
+ "--start_hms_server=true "
+ "--hms_port=5899 "
+ "--fallback_to_hms_on_errors=true"
+ )
+ def test_fallback_get_partitions_by_names(self, unique_database):
+ """
+ Test makes sure that the fall-back path is working in case of errors in catalogd's
+ implementation of get_partitions_by_name_req.
+ """
+ catalog_client = None
+ try:
+ catalog_client, hive_transport = ImpalaTestSuite.create_hive_client(5899)
+ assert catalog_client is not None
+ assert self.hive_client is not None
+ self.__create_test_tbls_from_hive(unique_database)
+ valid_part_names = ["year=2009/month=1", "year=2009/month=2", "year=2009/month=3",
+ "year=2009/month=4"]
+ self.__run_partitions_by_names_tests(catalog_client, unique_database,
+ TestMetastoreService.part_tbl, valid_part_names, True)
+ finally:
+ if catalog_client is not None:
+ catalog_client.shutdown()
+
+ def __create_test_tbls_from_hive(self, db_name):
+ """Util method to create test tables from hive in the given database. It creates
+ 4 tables (partitioned and unpartitioned) for non-acid and acid cases and returns
+ the valid partition names for the partitioned tables."""
+ # create a partitioned table
+ # Creating a table from hive and inserting into it takes very long. Hence we create
+ # a external table pointing to an existing table location
+ tbl_location = self.hive_client.get_table("functional", "alltypessmall").sd.location
+ self.run_stmt_in_hive(
+ "create external table {0}.{1} like functional.alltypessmall location '{2}'"
+ .format(db_name, TestMetastoreService.part_tbl, tbl_location))
+ self.run_stmt_in_hive(
+ "msck repair table {0}.{1}".format(db_name, TestMetastoreService.part_tbl))
+ # TODO create a acid partitioned table
+ tbl_location = self.hive_client.get_table("functional", "tinytable").sd.location
+ self.run_stmt_in_hive(
+ "create external table {0}.{1} like functional.tinytable location '{2}'"
+ .format(db_name, TestMetastoreService.unpart_tbl, tbl_location))
+ self.run_stmt_in_hive(
+ "create table default.{0} (c1 int)"
+ .format(TestMetastoreService.default_unknowntbl))
+
+ @classmethod
+ def __run_get_table_tests(cls, catalog_hms_client, db_name, tbl_name,
+ fallback_expected=False):
+ """
+ Issues get_table_req for various cases and validates the response.
+ """
+ # Test simple get_table_req without stats.
+ get_table_request = GetTableRequest()
+ get_table_request.dbName = db_name
+ get_table_request.tblName = tbl_name
+ get_table_response = catalog_hms_client.get_table_req(get_table_request)
+ assert get_table_response.table.dbName == db_name
+ assert get_table_response.table.tableName == tbl_name
+ # Request did not ask for stats, verify colStats are not populated.
+ assert get_table_response.table.colStats is None
+
+ # Test get_table_request with stats and engine set to Impala.
+ get_table_request.getColumnStats = True
+ get_table_request.engine = "impala"
+ get_table_response = catalog_hms_client.get_table_req(get_table_request)
+ assert get_table_response.table.dbName == db_name
+ assert get_table_response.table.tableName == tbl_name
+ assert get_table_response.table.colStats is not None
+ # Verify the column stats objects are populated for
+ # non-clustering columns.
+ assert len(get_table_response.table.colStats.statsObj) > 0
+
+ # We only return Impala table level column stats but engine field is
+ # required to be consistent with Hive's implementation.
+ get_table_request.engine = "hive"
+ get_table_response = catalog_hms_client.get_table_req(get_table_request)
+ assert get_table_response.table.dbName == db_name
+ assert get_table_response.table.tableName == tbl_name
+ assert get_table_response.table.colStats is not None
+ # Verify the column stats objects are populated for
+ # non-clustering columns.
+ assert len(get_table_response.table.colStats.statsObj) > 0
+
+ # request for tables which are unknown to catalogd should fallback to HMS
+ # if fallback_expected if True
+ get_table_request.dbName = TestMetastoreService.test_db_name
+ get_table_request.tblName = TestMetastoreService.unpart_tbl
+ get_table_request.getColumnStats = True
+
+ # Engine is not specified, this should throw an exception even if
+ # fallback_to_hms_on_errors is true.
+ expected_exception = "isGetColumnStats() is true in the request but " \
+ "engine is not specified."
+ try:
+ catalog_hms_client.get_table_req(get_table_request)
+ except Exception as e:
+ if expected_exception is not None:
+ assert expected_exception in str(e)
+
+ # Set engine and request impala, this should fallback to HMS since
+ # the table is not in catalog cache.
+ get_table_request.engine = "Impala"
+ if fallback_expected:
+ get_table_response = catalog_hms_client.get_table_req(get_table_request)
+ assert get_table_response.table.dbName == db_name
+ assert get_table_response.table.tableName == tbl_name
+ else:
+ expected_exception = None
+ try:
+ catalog_hms_client.get_table_req(get_table_request)
+ except Exception as e:
+ expected_exception = e
+ assert expected_exception is not None
+
+ def __run_partitions_by_names_tests(self, catalog_hms_client, db_name, tbl_name,
+ valid_part_names, expect_fallback=False):
+ """This method runs all the test cases for fetching partitions. The method
+ expects that the partition keys are year and month for the given table and
+ that there are atleast 3 partitions in the table."""
+ # Test get_partition_by_name_req
+ get_parts_req = GetPartitionsByNamesRequest()
+ get_parts_req.db_name = db_name
+ get_parts_req.tbl_name = tbl_name
+ # must have more than 3 partitions to test the filtering below
+ assert len(valid_part_names) > 3
+ part_names = valid_part_names[:3]
+ part_name_format = "year={0}/month={1}"
+ get_parts_req.names = part_names
+ response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+ assert response.partitions is not None
+ assert len(response.partitions) == 3
+ self.__validate_partitions(part_names, response, part_name_format)
+ # request partitions with file-metadata
+ get_parts_req = GetPartitionsByNamesRequest()
+ get_parts_req.db_name = db_name
+ get_parts_req.tbl_name = tbl_name
+ get_parts_req.names = part_names
+ get_parts_req.getFileMetadata = True
+ response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+ assert response.partitions is not None
+ assert len(response.partitions) == 3
+ self.__validate_partitions(part_names, response, part_name_format,
+ True)
+ # request contains unavailable partitions
+ get_parts_req = GetPartitionsByNamesRequest()
+ get_parts_req.db_name = db_name
+ get_parts_req.tbl_name = tbl_name
+ part_names = [x for x in valid_part_names]
+ part_names[0] = "year=2009/month=13"
+ get_parts_req.names = part_names
+ get_parts_req.getFileMetadata = True
+ assert get_parts_req.names is not None
+ response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+ assert response.partitions is not None
+ assert len(response.partitions) == 3
+ # remove the bad partname for comparison below
+ part_names.pop(0)
+ self.__validate_partitions(part_names, response, part_name_format, True)
+ # request contains empty partition names
+ get_parts_req = GetPartitionsByNamesRequest()
+ get_parts_req.db_name = db_name
+ get_parts_req.tbl_name = tbl_name
+ get_parts_req.names = []
+ response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+ assert response.partitions is not None
+ assert len(response.partitions) == 0
+ # Negative test cases
+ # invalid partition keys
+ get_parts_req = GetPartitionsByNamesRequest()
+ get_parts_req.db_name = db_name
+ get_parts_req.tbl_name = tbl_name
+ get_parts_req.names = ["invalid-key=1"]
+ response = catalog_hms_client.get_partitions_by_names_req(get_parts_req)
+ # in this case we replicate what HMS does which is to return 0 partitions
+ assert len(response.partitions) == 0
+ # empty table name
+ get_parts_req = GetPartitionsByNamesRequest()
+ get_parts_req.db_name = db_name
+ get_parts_req.tbl_name = ""
+ get_parts_req.names = []
+ if expect_fallback:
+ self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+ "NoSuchObjectException")
+ else:
+ self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+ "Table name is empty or null")
+ # empty db name
+ get_parts_req = GetPartitionsByNamesRequest()
+ get_parts_req.db_name = ""
+ get_parts_req.tbl_name = tbl_name
+ get_parts_req.names = []
+ if expect_fallback:
+ self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+ "NoSuchObjectException")
+ else:
+ self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+ "Database name is empty or null")
+ # table does not exist
+ get_parts_req = GetPartitionsByNamesRequest()
+ get_parts_req.db_name = db_name
+ get_parts_req.tbl_name = "table-does-not-exist"
+ get_parts_req.names = []
+ if expect_fallback:
+ # TODO HMS actually throws an InvalidObjectException but the HMS API signature
+ # doesn't declare it in the signature.
+ self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+ "Internal error")
+ else:
+ self.__get_parts_by_names_expect_exception(catalog_hms_client, get_parts_req,
+ "Table {0}.table-does-not-exist not found".format(
+ db_name))
+
+ @classmethod
+ def __validate_partitions(cls, part_names, get_parts_by_names_result, name_format,
+ expect_files=False):
+ """
+ Validates that the given list of partitions contains all the partitions from the
+ given list of partition names.
+ """
+ test_part_names = list(part_names)
+ if expect_files:
+ assert get_parts_by_names_result.dictionary is not None
+ assert len(get_parts_by_names_result.dictionary.values) > 0
+ else:
+ assert get_parts_by_names_result.dictionary is None
+ partitions = get_parts_by_names_result.partitions
+ for part in partitions:
+ assert part is not None
+ # we create the part name here since partition object only has keys
+ assert len(part.values) == 2
+ name = name_format.format(part.values[0], part.values[1])
+ assert name in test_part_names
+ if expect_files:
+ assert part.fileMetadata is not None
+ assert part.fileMetadata.data is not None
+ else:
+ assert part.fileMetadata is None
+ test_part_names.remove(name)
+ assert len(test_part_names) == 0
+
+ def __assert_filemd(self, filemetadata, obj_dict):
+ """
+ Util method which asserts that the given file-metadata is valid.
+ """
+ assert filemetadata is not None
+ assert filemetadata.data is not None
+ assert obj_dict is not None
+ assert len(obj_dict.values) > 0
+
+ def __assert_no_filemd(self, filemetadata, obj_dict):
+ """
+ Util method which asserts that the given file-metadata not valid. Used to verify
+ that the HMS response objects do not contain the file-metadata.
+ """
+ assert filemetadata is None
+ assert obj_dict is None
+
+ @classmethod
+ def __call_get_table_req_expect_exception(cls, client,
+ get_table_request, expected_exception_str=None):
+ exception_recieved = None
+ try:
+ client.get_table_req(get_table_request)
+ except Exception as e:
+ exception_recieved = e
+ if expected_exception_str is not None:
+ assert expected_exception_str in str(e)
+ assert exception_recieved is not None
+
+ @classmethod
+ def __get_parts_by_names_expect_exception(self, catalog_hms_client,
+ request, expected_exception_str=None):
+ """
+ Calls get_partitions_by_names_req on the HMS client and expects
+ it to call with the exception msg as provided in expected_exception_str
+ """
+ exception = None
+ try:
+ catalog_hms_client.get_partitions_by_names_req(request)
+ except Exception as e:
+ exception = e
+ if expected_exception_str is not None:
+ assert expected_exception_str in str(e)
+ assert exception is not None
+
+ def __compare_cols(self, cols, fieldSchemas):
+ """
+ Compares the given list of fieldSchemas with the expected cols
+ """
+ assert len(cols) == len(fieldSchemas)
+ for i in range(len(cols)):
+ assert cols[i][0] == fieldSchemas[i].name
+ assert cols[i][1] == fieldSchemas[i].type
+ assert cols[i][2] == fieldSchemas[i].comment
+
+ def __get_test_database(self, db_name,
+ description="test_db_for_metastore_service"):
+ db = Database()
+ db.name = db_name
+ db.desription = description
+ return db
+
+ def __get_test_tbl(self, db_name, tbl_name, cols, part_cols=None):
+ tbl = Table()
+ tbl.dbName = db_name
+ tbl.tableName = tbl_name
+ sd = StorageDescriptor()
+ tbl.sd = sd
+ sd.cols = self.__create_field_schemas(cols)
+ sd.inputFormat = \
+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
+ sd.outputFormat = \
+ "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
+ if part_cols is not None:
+ tbl.partitionKeys = self.__create_field_schemas(part_cols)
+ serde = SerDeInfo()
+ serde.name = ""
+ serde.serializationLib = \
+ "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
+ tbl.sd.serdeInfo = serde
+ return tbl
+
+ def __create_field_schemas(self, cols):
+ fieldSchemas = []
+ for col in cols:
+ f = FieldSchema()
+ f.name = col[0]
+ f.type = col[1]
+ f.comment = col[2]
+ fieldSchemas.append(f)
+ return fieldSchemas