You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/05/19 09:09:56 UTC
[impala] 01/03: IMPALA-10645: Log catalogd HMS API metrics
This is an automated email from the ASF dual-hosted git repository.
boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 202ba60c61a7dbcd4dfc5998a5fb94f3b9095d17
Author: kishendas <ki...@cloudera.com>
AuthorDate: Tue Oct 27 16:46:43 2020 -0400
IMPALA-10645: Log catalogd HMS API metrics
Expose rpc duration, cache hit ratio, etc for Catalogd HMS APIs.
The metrics currently are only logged at debug level
when the catalogd starts a HMS endpoint. A followup
will be done separately to expose them to the debug UI.
This patch was originally contributed by Kishen Das.
Testing:
1. Deployed the catalogd's metastore server and made sure that
the metrics are logged in the catalogd.INFO logs.
Change-Id: Id41afe89bbe3395c158919bddd09f302c6752287
Reviewed-on: http://gerrit.cloudera.org:8080/17284
Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
common/thrift/JniCatalog.thrift | 61 +++++-
common/thrift/metrics.json | 50 +++++
.../apache/impala/catalog/CatalogHmsAPIHelper.java | 8 +-
.../impala/catalog/CatalogServiceCatalog.java | 32 +++
.../catalog/metastore/CatalogMetastoreServer.java | 225 +++++++++++++++++++--
.../metastore/CatalogMetastoreServiceHandler.java | 16 +-
...logMetastoreServer.java => HmsApiNameEnum.java} | 35 ++--
.../catalog/metastore/ICatalogMetastoreServer.java | 8 +-
.../catalog/metastore/MetastoreServiceHandler.java | 11 +-
.../metastore/NoOpCatalogMetastoreServer.java | 5 +-
.../impala/catalog/monitor/CatalogMonitor.java | 8 +
11 files changed, 409 insertions(+), 50 deletions(-)
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 0b591b3..8b7bdef 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -846,13 +846,72 @@ struct TEventProcessorMetrics {
10: optional i64 last_synced_event_id
}
+struct TCatalogHmsCacheApiMetrics {
+ // name of the API
+ 1: required string api_name
+
+ // number of API requests
+ 2: optional i64 api_requests
+
+ // p99 response time in milliseconds
+ 3: optional double p99_response_time_ms
+
+ // p95 response time in milliseconds
+ 4: optional double p95_response_time_ms
+
+ // Mean response time in milliseconds
+ 5: optional double response_time_mean_ms
+
+ // Max response time in milliseconds
+ 6: optional double response_time_max_ms
+
+ // Min response time in milliseconds
+ 7: optional double response_time_min_ms
+
+ // Average number of API requests in 1 minute
+ 8: optional double api_requests_1min_rate
+
+ // Average number of API requests in 5 minutes
+ 9: optional double api_requests_5min_rate
+
+ // Average number of API requests in 15 min
+ 10: optional double api_requests_15min_rate
+
+ // Cache hit ratio
+ 11: optional double cache_hit_ratio
+}
+
+struct TCatalogdHmsCacheMetrics {
+
+ // API specific Catalogd HMS cache metrics
+ 1: required list<TCatalogHmsCacheApiMetrics> api_metrics
+
+ // overall cache hit ratio
+ 2: optional double cache_hit_ratio
+
+ // total number of API requests
+ 3: optional i64 api_requests
+
+ // Average number of API requests in 1 minute
+ 4: optional double api_requests_1min_rate
+
+ // Average number of API requests in 5 minutes
+ 5: optional double api_requests_5min_rate
+
+ // Average number of API requests in 15 min
+ 6: optional double api_requests_15min_rate
+}
+
// Response to GetCatalogServerMetrics() call.
struct TGetCatalogServerMetricsResponse {
// Partial fetch RPC queue length.
1: required i32 catalog_partial_fetch_rpc_queue_len
// gets the events processor metrics if configured
- 2: optional TEventProcessorMetrics event_metrics;
+ 2: optional TEventProcessorMetrics event_metrics
+
+ // get the catalogd Hive metastore server metrics, if configured
+ 3: optional TCatalogdHmsCacheMetrics catalogd_hms_cache_metrics
}
// Request to copy the generated testcase from a given input path.
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 3ac2d8b..b962145 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2864,5 +2864,55 @@
"units": "NONE",
"kind": "PROPERTY",
"key": "admissiond.version"
+ },
+ {
+ "description": "Catalogd HMS cache hit ratio.",
+ "contexts": [
+ "CATALOGSERVER"
+ ],
+ "label": "Catalogd HMS cache hit ratio",
+ "units": "NONE",
+ "kind" : "GAUGE",
+ "key" : "catalogd.hms.cache.cache.hit.ratio"
+ },
+ {
+ "description": "Catalogd HMS cache API requests.",
+ "contexts": [
+ "CATALOGSERVER"
+ ],
+ "label": "Status of Catalogd HMS cache",
+ "units": "NONE",
+ "kind" : "COUNTER",
+ "key" : "catalogd.hms.cache.api.requests"
+ },
+ {
+ "description": "Catalogd HMS cache API requests rate for last 1 min.",
+ "contexts": [
+ "CATALOGSERVER"
+ ],
+ "label": "Catalogd HMS cache API requests rate for last 1 minute",
+ "units": "NONE",
+ "kind" : "GAUGE",
+ "key" : "catalogd.hms.cache.status.api.requests.1min"
+ },
+ {
+ "description": "Catalogd HMS cache API requests rate for last 5 min.",
+ "contexts": [
+ "CATALOGSERVER"
+ ],
+ "label": "Catalogd HMS cache API requests rate for last 5 minutes",
+ "units": "NONE",
+ "kind" : "GAUGE",
+ "key" : "catalogd.hms.cache.status.api.requests.5min"
+ },
+ {
+ "description": "Catalogd HMS cache API requests rate for last 15 min.",
+ "contexts": [
+ "CATALOGSERVER"
+ ],
+ "label": "Catalogd HMS cache API requests rate for last 15 minutes",
+ "units": "NONE",
+ "kind" : "GAUGE",
+ "key" : "catalogd.hms.cache.status.api.requests.15min"
}
]
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
index ddde537..c71547c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.impala.analysis.TableName;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.catalog.metastore.CatalogMetastoreServiceHandler;
+import org.apache.impala.catalog.metastore.HmsApiNameEnum;
import org.apache.impala.common.Pair;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.thrift.CatalogLookupStatus;
@@ -145,7 +146,8 @@ public class CatalogHmsAPIHelper {
//TODO add table id in the request when client passes it. Also, looks like
// when HIVE-24662 is resolved, we may not need the table id in this request.
TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(
- catalog, reqBuilder.build(), dbName, tblName, "processing get_table_req");
+ catalog, reqBuilder.build(), dbName, tblName,
+ HmsApiNameEnum.GET_TABLE_REQ.apiName());
// As of writing this code, we know that the table which is returned in the response
// is a copy of table stored in the catalogd. Should this assumption change in the
// future, we must make sure that we take a deepCopy() of the table before modifying
@@ -255,7 +257,7 @@ public class CatalogHmsAPIHelper {
}
TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(catalog,
catalogReq.build(), dbName, tblName,
- "processing " + CatalogMetastoreServiceHandler.GET_PARTITION_BY_EXPR);
+ HmsApiNameEnum.GET_PARTITION_BY_EXPR.apiName());
checkCondition(response.table_info.hms_table.getPartitionKeys() != null,
"%s is not a partitioned table", tableName);
// create a mapping of the Partition name to the Partition so that we can return the
@@ -377,7 +379,7 @@ public class CatalogHmsAPIHelper {
// when HIVE-24662 is resolved, we may not need the table id in this request.
TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(catalog,
requestBuilder.build(), dbName, tblName,
- "processing " + CatalogMetastoreServiceHandler.GET_PARTITION_BY_NAMES);
+ HmsApiNameEnum.GET_PARTITION_BY_NAMES.apiName());
checkCondition(response.table_info.hms_table.getPartitionKeys() != null,
"%s.%s is not a partitioned table", dbName, tblName);
checkCondition(
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 67bdb26..9a6bbd5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -64,7 +64,9 @@ import org.apache.impala.catalog.events.NoOpEventProcessor;
import org.apache.impala.catalog.events.SelfEventContext;
import org.apache.impala.catalog.monitor.CatalogMonitor;
import org.apache.impala.catalog.monitor.CatalogTableMetrics;
+import org.apache.impala.catalog.monitor.CatalogMonitor;
import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
+import org.apache.impala.catalog.metastore.HmsApiNameEnum;
import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
import org.apache.impala.common.FileSystemUtil;
@@ -78,6 +80,7 @@ import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.CatalogLookupStatus;
import org.apache.impala.thrift.CatalogServiceConstants;
import org.apache.impala.thrift.TCatalog;
+import org.apache.impala.thrift.TCatalogdHmsCacheMetrics;
import org.apache.impala.thrift.TCatalogInfoSelector;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
@@ -2209,8 +2212,30 @@ public class CatalogServiceCatalog extends Catalog {
// the ValidWriteIdList only when the table id matches.
if (tbl instanceof HdfsTable
&& AcidUtils.compare((HdfsTable) tbl, validWriteIdList, tableId) >= 0) {
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getCounter(CatalogMetastoreServer.CATALOGD_CACHE_HIT_METRIC)
+ .inc();
+ // Update the cache stats for a HMS API from which the current method got invoked.
+ if (HmsApiNameEnum.contains(reason)) {
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getCounter(String
+ .format(CatalogMetastoreServer.CATALOGD_CACHE_API_HIT_METRIC, reason))
+ .inc();
+ }
return tbl;
}
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getCounter(CatalogMetastoreServer.CATALOGD_CACHE_MISS_METRIC)
+ .inc();
+ // Update the cache stats for a HMS API from which the current method got invoked.
+ if (HmsApiNameEnum.contains(reason)) {
+ // Update the cache miss metric, as the valid write id list did not match and we
+ // have to reload the table.
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getCounter(String
+ .format(CatalogMetastoreServer.CATALOGD_CACHE_API_MISS_METRIC, reason))
+ .inc();
+ }
previousCatalogVersion = tbl.getCatalogVersion();
loadReq = tableLoadingMgr_.loadAsync(tableName, reason);
} finally {
@@ -3303,6 +3328,13 @@ public class CatalogServiceCatalog extends Catalog {
}
/**
+ * Gets the Catalogd HMS cache metrics. Used for publishing metrics on the webUI.
+ */
+ public TCatalogdHmsCacheMetrics getCatalogdHmsCacheMetrics() {
+ return catalogMetastoreServer_.getCatalogdHmsCacheMetrics();
+ }
+
+ /**
* Gets the events processor summary. Used for populating the contents of the events
* processor detailed view page
*/
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java
index 3414f4d..f120478 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java
@@ -17,9 +17,18 @@
package org.apache.impala.catalog.metastore;
+import java.util.ArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@@ -27,11 +36,17 @@ import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive;
import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.metastore.HmsApiNameEnum;
+import org.apache.impala.catalog.monitor.CatalogMonitor;
import org.apache.impala.common.Metrics;
+import org.apache.impala.thrift.TCatalogdHmsCacheMetrics;
+import org.apache.impala.thrift.TCatalogHmsCacheApiMetrics;
import org.apache.impala.service.BackendConfig;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -80,12 +95,29 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
// will be blocked until a server thread is closed.
private static final int MAX_SERVER_THREADS = 500;
+ // Metrics for CatalogD HMS cache
private static final String ACTIVE_CONNECTIONS_METRIC = "metastore.active.connections";
+ public static final String CATALOGD_CACHE_MISS_METRIC = "catalogd-hms-cache.miss";
+ public static final String CATALOGD_CACHE_HIT_METRIC = "catalogd-hms-cache.hit";
+ public static final String CATALOGD_CACHE_API_REQUESTS_METRIC =
+ "catalogd-hms-cache.api-requests";
+
+ // CatalogD HMS Cache - API specific metrics
private static final String RPC_DURATION_FORMAT_METRIC = "metastore.rpc.duration.%s";
+ public static final String CATALOGD_CACHE_API_MISS_METRIC =
+ "catalogd-hms-cache.cache-miss.api.%s";
+ public static final String CATALOGD_CACHE_API_HIT_METRIC =
+ "catalogd-hms-cache.cache-hit.api.%s";
+
+ public static final Set<String> apiNamesSet_ = new HashSet<>();
// flag to indicate if the server is started or not
private final AtomicBoolean started_ = new AtomicBoolean(false);
+ // Logs Catalogd HMS cache metrics at a fixed frequency.
+ private final ScheduledExecutorService metricsLoggerService_ =
+ Executors.newScheduledThreadPool(1);
+
// the server is started in a daemon thread so that instantiating this is not
// a blocking call.
private CompletableFuture<Void> serverHandle_;
@@ -93,13 +125,21 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
// reference to the catalog Service catalog object
private final CatalogServiceCatalog catalog_;
- // Metrics for this Metastore server. Also this instance is passed to the
- // TServerEventHandler when the server is started so that RPC metrics can be registered.
- private final Metrics metrics_ = new Metrics();
-
public CatalogMetastoreServer(CatalogServiceCatalog catalogServiceCatalog) {
Preconditions.checkNotNull(catalogServiceCatalog);
catalog_ = catalogServiceCatalog;
+ initMetrics();
+ }
+
+ private void initMetrics() {
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .addCounter(CATALOGD_CACHE_MISS_METRIC);
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .addCounter(CATALOGD_CACHE_HIT_METRIC);
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .addMeter(CATALOGD_CACHE_API_REQUESTS_METRIC);
+ metricsLoggerService_.scheduleAtFixedRate(
+ new MetricsLogger(this), 0, 1, TimeUnit.MINUTES);
}
/**
@@ -112,14 +152,16 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
@Override
public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
- metrics_.getCounter(ACTIVE_CONNECTIONS_METRIC).inc();
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getCounter(ACTIVE_CONNECTIONS_METRIC).inc();
return null;
}
@Override
public void deleteContext(ServerContext serverContext, TProtocol tProtocol,
TProtocol tProtocol1) {
- metrics_.getCounter(ACTIVE_CONNECTIONS_METRIC).dec();
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getCounter(ACTIVE_CONNECTIONS_METRIC).dec();
}
@Override
@@ -152,19 +194,46 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+ synchronized (apiNamesSet_) {
+ // we synchronize on apiNamesSet_ because the metrics logger thread can be
+ // reading it at the same time.
+ apiNamesSet_.add(method.getName());
+ }
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
+ .mark();
Timer.Context context =
- metrics_.getTimer(String.format(RPC_DURATION_FORMAT_METRIC, method.getName()))
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getTimer(String.format(RPC_DURATION_FORMAT_METRIC,
+ method.getName()) +
+ Thread.currentThread().getId())
.time();
+ if (CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getCounter(String.format(CATALOGD_CACHE_API_MISS_METRIC,
+ method.getName())) == null) {
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .addCounter(String.format(CATALOGD_CACHE_API_MISS_METRIC,
+ method.getName()));
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .addCounter(String.format(CATALOGD_CACHE_API_HIT_METRIC,
+ method.getName()));
+ }
+
try {
LOG.debug("Invoking HMS API: {}", method.getName());
return method.invoke(handler_, args);
} catch (Exception ex) {
Throwable unwrapped = unwrap(ex);
- LOG.error("Received exception while executing " + method.getName() + " : ",
+ LOG.error("Received exception while executing "
+ + method.getName() + " : ",
unwrapped);
throw unwrapped;
} finally {
- context.stop();
+ long elapsedTime = TimeUnit.NANOSECONDS.toMillis(context.stop());
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getTimer(String.format(RPC_DURATION_FORMAT_METRIC,
+ method.getName()))
+ .update(elapsedTime, TimeUnit.MILLISECONDS);
}
}
@@ -187,6 +256,28 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
}
/**
+ * Runnable task which logs the current HMS cache metrics. This is scheduled by
+ * {@link CatalogMetastoreServer} so that we print the metrics of the APIs which are
+ * called regularly in the log. The metrics are logged only at debug level currently
+ * so this is useful for mostly debugging purposes currently.
+ * TODO Remove this and expose the metrics in the catalogd's debug UI.
+ */
+ private static class MetricsLogger implements Runnable {
+
+ private final CatalogMetastoreServer server_;
+
+ public MetricsLogger(CatalogMetastoreServer server) {
+ this.server_ = server;
+ }
+
+ @Override
+ public void run() {
+ TCatalogdHmsCacheMetrics metrics = server_.getCatalogdHmsCacheMetrics();
+ LOG.debug("CatalogdHMSCacheMetrics : {}", metrics.toString());
+ }
+ }
+
+ /**
* Starts the thrift server in a background thread and the configured port. Currently,
* only support NOSASL mode. TODO Add SASL and ssl support (IMPALA-10638)
*
@@ -198,7 +289,7 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
Preconditions.checkState(!started_.get(), "Metastore server is already started");
LOG.info("Starting the Metastore server at port number {}", portNumber);
CatalogMetastoreServiceHandler handler =
- new CatalogMetastoreServiceHandler(catalog_, metrics_,
+ new CatalogMetastoreServiceHandler(catalog_,
BackendConfig.INSTANCE.fallbackToHMSOnErrors());
// create a proxy class for the ThriftMetastore.Iface and ICatalogMetastoreServer
// so that all the APIs can be invoked via a TimingInvocationHandler
@@ -268,12 +359,122 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
}
/**
+<<<<<<< HEAD
* Returns the RPC and connection metrics for this metastore server. //TODO hook this
* method to the Catalog's debug UI
+=======
+ * Returns the RPC and connection metrics for this metastore server.
+>>>>>>> c4a8633759... IMPALA-10645: Log catalogd HMS API metrics
*/
@Override
- public String getMetrics() {
- return metrics_.toString();
+ public TCatalogdHmsCacheMetrics getCatalogdHmsCacheMetrics() {
+ long apiRequests = CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
+ .getCount();
+ double cacheHitRatio =
+ getHitRatio(CATALOGD_CACHE_HIT_METRIC, CATALOGD_CACHE_MISS_METRIC);
+ double apiRequestsOneMinute =
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
+ .getOneMinuteRate();
+ double apiRequestsFiveMinutes =
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
+ .getFiveMinuteRate();
+ double apiRequestsFifteenMinutes =
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
+ .getFifteenMinuteRate();
+
+ TCatalogdHmsCacheMetrics catalogdHmsCacheMetrics = new TCatalogdHmsCacheMetrics();
+
+ List<TCatalogHmsCacheApiMetrics> apiMetricsList = new ArrayList<>();
+ catalogdHmsCacheMetrics.setApi_metrics(apiMetricsList);
+
+ catalogdHmsCacheMetrics.setCache_hit_ratio(cacheHitRatio);
+ catalogdHmsCacheMetrics.setApi_requests(apiRequests);
+ catalogdHmsCacheMetrics.setApi_requests_1min_rate(apiRequestsOneMinute);
+ catalogdHmsCacheMetrics.setApi_requests_5min_rate(apiRequestsFiveMinutes);
+ catalogdHmsCacheMetrics.setApi_requests_15min_rate(apiRequestsFifteenMinutes);
+
+ HashSet<String> apiNames;
+ synchronized (apiNamesSet_) {
+ // we synchronize apiNamesSet_ here because a concurrent invoke() method could
+ // be modifying it at the same time.
+ apiNames = new HashSet<>(apiNamesSet_);
+ }
+ for (String apiName : apiNames) {
+ TCatalogHmsCacheApiMetrics apiMetrics = new TCatalogHmsCacheApiMetrics();
+ apiMetricsList.add(apiMetrics);
+ double specificApiP95ResponseTime =
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+ .getSnapshot()
+ .get95thPercentile();
+ double specificApiP99ResponseTime =
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+ .getSnapshot()
+ .get99thPercentile();
+ long specificApiRequests =
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+ .getCount();
+ // we collect the cache hit ratio metrics only for the APIs which we serve from
+ // catalogd server.
+ if (HmsApiNameEnum.contains(apiName)) {
+ double specificApiCacheHitRatio =
+ getHitRatio(String.format(CATALOGD_CACHE_API_HIT_METRIC, apiName),
+ String.format(CATALOGD_CACHE_API_MISS_METRIC, apiName));
+ apiMetrics.setCache_hit_ratio(specificApiCacheHitRatio);
+ }
+ double specificApiRequestsOneMinute =
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+ .getOneMinuteRate();
+ double specificApiRequestsFiveMinutes =
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+ .getFiveMinuteRate();
+ double specificApiRequestsFifteenMinutes =
+ CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+ .getFifteenMinuteRate();
+ long max = CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+ .getSnapshot()
+ .getMax();
+ long min = CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+ .getSnapshot()
+ .getMin();
+ double mean = CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+ .getSnapshot()
+ .getMean();
+ apiMetrics.setApi_name(apiName);
+ apiMetrics.setApi_requests(specificApiRequests);
+ apiMetrics.setP99_response_time_ms(specificApiP99ResponseTime);
+ apiMetrics.setP95_response_time_ms(specificApiP95ResponseTime);
+ apiMetrics.setResponse_time_mean_ms(mean);
+ apiMetrics.setResponse_time_max_ms(max);
+ apiMetrics.setResponse_time_min_ms(min);
+ apiMetrics.setApi_requests_1min_rate(specificApiRequestsOneMinute);
+ apiMetrics.setApi_requests_5min_rate(specificApiRequestsFiveMinutes);
+ apiMetrics.setApi_requests_15min_rate(specificApiRequestsFifteenMinutes);
+ }
+ return catalogdHmsCacheMetrics;
+ }
+
+ /**
+ * Returns the hit ratio given the metric names for the hits and misses.
+ */
+ private double getHitRatio(String hitMetric, String missMetric) {
+ long hitCount = CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getCounter(hitMetric).getCount();
+ long missCount = CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+ .getCounter(missMetric).getCount();
+ return ((double) hitCount) / (hitCount + missCount);
}
/**
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
index be00d69..a192c7c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
@@ -17,6 +17,8 @@
package org.apache.impala.catalog.metastore;
+import static org.apache.impala.catalog.metastore.HmsApiNameEnum.GET_PARTITION_BY_NAMES;
+
import org.apache.hadoop.hive.metastore.api.GetFieldsRequest;
import org.apache.hadoop.hive.metastore.api.GetFieldsResponse;
import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
@@ -45,6 +47,8 @@ import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.CatalogHmsAPIHelper;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.common.Metrics;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.BackendConfig;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -61,9 +65,9 @@ public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler {
private static final Logger LOG = LoggerFactory
.getLogger(CatalogMetastoreServiceHandler.class);
- public CatalogMetastoreServiceHandler(CatalogServiceCatalog catalog, Metrics metrics,
+ public CatalogMetastoreServiceHandler(CatalogServiceCatalog catalog,
boolean fallBackToHMSOnErrors) {
- super(catalog, metrics, fallBackToHMSOnErrors);
+ super(catalog, fallBackToHMSOnErrors);
}
@Override
@@ -113,11 +117,13 @@ public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler {
}
} catch (Exception e) {
// we catch the CatalogException and fall-back to HMS
- throwIfNoFallback(e, GET_PARTITION_BY_EXPR);
+ throwIfNoFallback(e, HmsApiNameEnum.GET_PARTITION_BY_EXPR.apiName());
}
String tblName =
partitionsByExprRequest.getDbName() + "." + partitionsByExprRequest.getTblName();
- LOG.info(String.format(HMS_FALLBACK_MSG_FORMAT, GET_PARTITION_BY_EXPR, tblName));
+ LOG.info(String
+ .format(HMS_FALLBACK_MSG_FORMAT, HmsApiNameEnum.GET_PARTITION_BY_EXPR.apiName(),
+ tblName));
return super.get_partitions_by_expr(partitionsByExprRequest);
}
@@ -141,7 +147,7 @@ public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler {
return CatalogHmsAPIHelper
.getPartitionsByNames(catalog_, serverConf_, getPartitionsByNamesRequest);
} catch (Exception ex) {
- throwIfNoFallback(ex, GET_PARTITION_BY_NAMES);
+ throwIfNoFallback(ex, GET_PARTITION_BY_NAMES.apiName());
}
String tblName =
getPartitionsByNamesRequest.getDb_name() + "." + getPartitionsByNamesRequest
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/HmsApiNameEnum.java
similarity index 57%
copy from fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
copy to fe/src/main/java/org/apache/impala/catalog/metastore/HmsApiNameEnum.java
index 399ed09..eeb3357 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/HmsApiNameEnum.java
@@ -17,28 +17,31 @@
package org.apache.impala.catalog.metastore;
-import org.apache.impala.catalog.CatalogException;
-
/**
- * NoOpCatalogMetastoreServer which is used when metastore service is not configured.
+ * HmsApiNameEnum has list of names of HMS APIs that will be served from CatalogD HMS
+ * cache, if CatalogD caching is enabled.
*/
-public class NoOpCatalogMetastoreServer implements ICatalogMetastoreServer {
- public static final NoOpCatalogMetastoreServer INSTANCE =
- new NoOpCatalogMetastoreServer();
- private static final String EMPTY = "";
+public enum HmsApiNameEnum {
+ GET_TABLE_REQ("get_table_req"),
+ GET_PARTITION_BY_EXPR("get_partitions_by_expr"),
+ GET_PARTITION_BY_NAMES("get_partitions_by_names_req");
+
+ private final String apiName;
- @Override
- public void start() throws CatalogException {
- // no-op
+ HmsApiNameEnum(String apiName) {
+ this.apiName = apiName;
}
- @Override
- public void stop() throws CatalogException {
- // no-op
+ public String apiName() {
+ return apiName;
}
- @Override
- public String getMetrics() {
- return EMPTY;
+ public static boolean contains(String apiName) {
+ for (HmsApiNameEnum api : HmsApiNameEnum.values()) {
+ if (api.name().equals(apiName)) {
+ return true;
+ }
+ }
+ return false;
}
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java
index e5aa7a9..c1ecda0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java
@@ -18,6 +18,7 @@
package org.apache.impala.catalog.metastore;
import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.thrift.TCatalogdHmsCacheMetrics;
/**
* This is the main Interface which a CatalogMetastore service should implement. It
@@ -39,9 +40,8 @@ public interface ICatalogMetastoreServer {
void stop() throws CatalogException;
/**
- * Returns the metrics for this Catalog Metastore service.
- * @return Encoded String (eg. Json format) representation of all the metrics for this
- * metastore service.
+ * Returns the metrics for this CatalogD HMS Cache.
+ * @return TCatalogdHmsCacheMetrics.
*/
- String getMetrics();
+ TCatalogdHmsCacheMetrics getCatalogdHmsCacheMetrics();
}
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
index 9a1b6ba..13ad92f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
@@ -262,7 +262,6 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.impala.catalog.CatalogHmsAPIHelper;
import org.apache.impala.catalog.CatalogServiceCatalog;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
-import org.apache.impala.common.Metrics;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.util.AcidUtils;
import org.apache.thrift.TException;
@@ -293,20 +292,16 @@ public abstract class MetastoreServiceHandler implements Iface {
+ "table %s to the backing HiveMetastore service";
// constant used for logging error messages
- public static final String GET_PARTITION_BY_EXPR = "get_partitions_by_expr";
- public static final String GET_PARTITION_BY_NAMES = "get_partitions_by_names_req";
protected final CatalogServiceCatalog catalog_;
protected final boolean fallBackToHMSOnErrors_;
- protected final Metrics metrics_;
// TODO handle session configuration
protected Configuration serverConf_;
protected PartitionExpressionProxy expressionProxy_;
protected final String defaultCatalogName_;
- public MetastoreServiceHandler(CatalogServiceCatalog catalog, Metrics metrics,
+ public MetastoreServiceHandler(CatalogServiceCatalog catalog,
boolean fallBackToHMSOnErrors) {
catalog_ = Preconditions.checkNotNull(catalog);
- metrics_ = Preconditions.checkNotNull(metrics);
fallBackToHMSOnErrors_ = fallBackToHMSOnErrors;
LOG.info("Fallback to hive metastore service on errors is {}",
fallBackToHMSOnErrors_);
@@ -1306,7 +1301,9 @@ public abstract class MetastoreServiceHandler implements Iface {
String tblName =
getPartitionsByNamesRequest.getDb_name() + "." + getPartitionsByNamesRequest
.getTbl_name();
- LOG.info(String.format(HMS_FALLBACK_MSG_FORMAT, GET_PARTITION_BY_NAMES, tblName));
+ LOG.info(String
+ .format(HMS_FALLBACK_MSG_FORMAT, HmsApiNameEnum.GET_PARTITION_BY_NAMES.apiName(),
+ tblName));
boolean getFileMetadata = getPartitionsByNamesRequest.isGetFileMetadata();
GetPartitionsByNamesResult result;
ValidWriteIdList validWriteIdList = null;
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
index 399ed09..c2433a0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
@@ -18,6 +18,7 @@
package org.apache.impala.catalog.metastore;
import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.thrift.TCatalogdHmsCacheMetrics;
/**
* NoOpCatalogMetastoreServer which is used when metastore service is not configured.
@@ -38,7 +39,7 @@ public class NoOpCatalogMetastoreServer implements ICatalogMetastoreServer {
}
@Override
- public String getMetrics() {
- return EMPTY;
+ public TCatalogdHmsCacheMetrics getCatalogdHmsCacheMetrics() {
+ return new TCatalogdHmsCacheMetrics();
}
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java
index 7454f61..488f669 100644
--- a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java
@@ -17,6 +17,8 @@
package org.apache.impala.catalog.monitor;
+import org.apache.impala.common.Metrics;
+
/**
* Singleton class that helps monitor catalog, monitoring classes can be
* accessed through the CatalogMonitor.
@@ -28,6 +30,8 @@ public final class CatalogMonitor {
private final CatalogOperationMetrics catalogOperationUsage_;
+ private final Metrics catalogdHmsCacheMetrics_ = new Metrics();
+
private CatalogMonitor() {
catalogTableMetrics_ = CatalogTableMetrics.INSTANCE;
catalogOperationUsage_ = CatalogOperationMetrics.INSTANCE;
@@ -38,4 +42,8 @@ public final class CatalogMonitor {
public CatalogOperationMetrics getCatalogOperationMetrics() {
return catalogOperationUsage_;
}
+
+ public Metrics getCatalogdHmsCacheMetrics() {
+ return catalogdHmsCacheMetrics_;
+ }
}