You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2021/05/19 09:09:55 UTC

[impala] branch master updated (824b39e -> 26eaa43)

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from 824b39e  IMPALA-10433: Use Iceberg's fixed partition transforms
     new 202ba60  IMPALA-10645: Log catalogd HMS API metrics
     new 9cc4f5e  IMPALA-10678: Support custom SASL protocol name in Kudu client
     new 26eaa43  IMPALA-10695: add dedicated thread pool for OSS/JindoFS.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/global-flags.cc                      |   4 +
 be/src/exec/kudu-util.cc                           |   2 +
 be/src/runtime/io/disk-io-mgr-test.cc              |   4 +-
 be/src/runtime/io/disk-io-mgr.cc                   |   8 +
 be/src/runtime/io/disk-io-mgr.h                    |   4 +
 be/src/util/backend-gflag-util.cc                  |   2 +
 be/src/util/hdfs-util.cc                           |   7 +
 be/src/util/hdfs-util.h                            |   3 +
 bin/impala-config.sh                               |   4 +-
 common/thrift/BackendGflags.thrift                 |   2 +
 common/thrift/JniCatalog.thrift                    |  61 +++++-
 common/thrift/metrics.json                         |  50 +++++
 .../apache/impala/catalog/CatalogHmsAPIHelper.java |   8 +-
 .../impala/catalog/CatalogServiceCatalog.java      |  32 +++
 .../catalog/metastore/CatalogMetastoreServer.java  | 225 +++++++++++++++++++--
 .../metastore/CatalogMetastoreServiceHandler.java  |  16 +-
 ...logMetastoreServer.java => HmsApiNameEnum.java} |  35 ++--
 .../catalog/metastore/ICatalogMetastoreServer.java |   8 +-
 .../catalog/metastore/MetastoreServiceHandler.java |  11 +-
 .../metastore/NoOpCatalogMetastoreServer.java      |   5 +-
 .../impala/catalog/monitor/CatalogMonitor.java     |   8 +
 .../org/apache/impala/service/BackendConfig.java   |   2 +
 .../main/java/org/apache/impala/util/KuduUtil.java |   1 +
 23 files changed, 449 insertions(+), 53 deletions(-)
 copy fe/src/main/java/org/apache/impala/catalog/metastore/{NoOpCatalogMetastoreServer.java => HmsApiNameEnum.java} (57%)

[impala] 01/03: IMPALA-10645: Log catalogd HMS API metrics

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 202ba60c61a7dbcd4dfc5998a5fb94f3b9095d17
Author: kishendas <ki...@cloudera.com>
AuthorDate: Tue Oct 27 16:46:43 2020 -0400

    IMPALA-10645: Log catalogd HMS API metrics
    
    Expose rpc duration, cache hit ratio, etc for Catalogd HMS APIs.
    The metrics currently are only logged at debug level
    when the catalogd starts a HMS endpoint. A followup
    will be done separately to expose them to the debug UI.
    
    This patch was originally contributed by Kishen Das.
    
    Testing:
    1. Deployed the catalogd's metastore server and made sure that
    the metrics are logged in the catalogd.INFO logs.
    
    Change-Id: Id41afe89bbe3395c158919bddd09f302c6752287
    Reviewed-on: http://gerrit.cloudera.org:8080/17284
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 common/thrift/JniCatalog.thrift                    |  61 +++++-
 common/thrift/metrics.json                         |  50 +++++
 .../apache/impala/catalog/CatalogHmsAPIHelper.java |   8 +-
 .../impala/catalog/CatalogServiceCatalog.java      |  32 +++
 .../catalog/metastore/CatalogMetastoreServer.java  | 225 +++++++++++++++++++--
 .../metastore/CatalogMetastoreServiceHandler.java  |  16 +-
 ...logMetastoreServer.java => HmsApiNameEnum.java} |  35 ++--
 .../catalog/metastore/ICatalogMetastoreServer.java |   8 +-
 .../catalog/metastore/MetastoreServiceHandler.java |  11 +-
 .../metastore/NoOpCatalogMetastoreServer.java      |   5 +-
 .../impala/catalog/monitor/CatalogMonitor.java     |   8 +
 11 files changed, 409 insertions(+), 50 deletions(-)

diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 0b591b3..8b7bdef 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -846,13 +846,72 @@ struct TEventProcessorMetrics {
   10: optional i64 last_synced_event_id
 }
 
+struct TCatalogHmsCacheApiMetrics {
+  // name of the API
+  1: required string api_name
+
+  // number of API requests
+  2: optional i64 api_requests
+
+  // p99 response time in milliseconds
+  3: optional double p99_response_time_ms
+
+  // p95 response time in milliseconds
+  4: optional double p95_response_time_ms
+
+  // Mean response time in milliseconds
+  5: optional double response_time_mean_ms
+
+  // Max response time in milliseconds
+  6: optional double response_time_max_ms
+
+  // Min response time in milliseconds
+  7: optional double response_time_min_ms
+
+  // Average number of API requests in 1 minute
+  8: optional double api_requests_1min_rate
+
+  // Average number of API requests in 5 minutes
+  9: optional double api_requests_5min_rate
+
+  // Average number of API requests in 15 min
+  10: optional double api_requests_15min_rate
+
+  // Cache hit ratio
+  11: optional double cache_hit_ratio
+}
+
+struct TCatalogdHmsCacheMetrics {
+
+  // API specific Catalogd HMS cache metrics
+  1: required list<TCatalogHmsCacheApiMetrics> api_metrics
+
+  // overall cache hit ratio
+  2: optional double cache_hit_ratio
+
+  // total number of API requests
+  3: optional i64 api_requests
+
+  // Average number of API requests in 1 minute
+  4: optional double api_requests_1min_rate
+
+  // Average number of API requests in 5 minutes
+  5: optional double api_requests_5min_rate
+
+  // Average number of API requests in 15 min
+  6: optional double api_requests_15min_rate
+}
+
 // Response to GetCatalogServerMetrics() call.
 struct TGetCatalogServerMetricsResponse {
   // Partial fetch RPC queue length.
   1: required i32 catalog_partial_fetch_rpc_queue_len
 
   // gets the events processor metrics if configured
-  2: optional TEventProcessorMetrics event_metrics;
+  2: optional TEventProcessorMetrics event_metrics
+
+  // get the catalogd Hive metastore server metrics, if configured
+  3: optional TCatalogdHmsCacheMetrics catalogd_hms_cache_metrics
 }
 
 // Request to copy the generated testcase from a given input path.
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 3ac2d8b..b962145 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2864,5 +2864,55 @@
     "units": "NONE",
     "kind": "PROPERTY",
     "key": "admissiond.version"
+  },
+  {
+    "description": "Catalogd HMS cache hit ratio.",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "Catalogd HMS cache hit ratio",
+    "units": "NONE",
+    "kind" : "GAUGE",
+    "key" : "catalogd.hms.cache.cache.hit.ratio"
+  },
+  {
+    "description": "Catalogd HMS cache API requests.",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "Status of Catalogd HMS cache",
+    "units": "NONE",
+    "kind" : "COUNTER",
+    "key" : "catalogd.hms.cache.api.requests"
+  },
+  {
+    "description": "Catalogd HMS cache API requests rate for last 1 min.",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "Catalogd HMS cache API requests rate for last 1 minute",
+    "units": "NONE",
+    "kind" : "GAUGE",
+    "key" : "catalogd.hms.cache.status.api.requests.1min"
+  },
+  {
+    "description": "Catalogd HMS cache API requests rate for last 5 min.",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "Catalogd HMS cache API requests rate for last 5 minutes",
+    "units": "NONE",
+    "kind" : "GAUGE",
+    "key" : "catalogd.hms.cache.status.api.requests.5min"
+  },
+  {
+    "description": "Catalogd HMS cache API requests rate for last 15 min.",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "Catalogd HMS cache API requests rate for last 15 minutes",
+    "units": "NONE",
+    "kind" : "GAUGE",
+    "key" : "catalogd.hms.cache.status.api.requests.15min"
   }
 ]
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
index ddde537..c71547c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogHmsAPIHelper.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.catalog.metastore.CatalogMetastoreServiceHandler;
+import org.apache.impala.catalog.metastore.HmsApiNameEnum;
 import org.apache.impala.common.Pair;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.thrift.CatalogLookupStatus;
@@ -145,7 +146,8 @@ public class CatalogHmsAPIHelper {
     //TODO add table id in the request when client passes it. Also, looks like
     // when HIVE-24662 is resolved, we may not need the table id in this request.
     TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(
-        catalog, reqBuilder.build(), dbName, tblName, "processing get_table_req");
+        catalog, reqBuilder.build(), dbName, tblName,
+        HmsApiNameEnum.GET_TABLE_REQ.apiName());
     // As of writing this code, we know that the table which is returned in the response
     // is a copy of table stored in the catalogd. Should this assumption change in the
     // future, we must make sure that we take a deepCopy() of the table before modifying
@@ -255,7 +257,7 @@ public class CatalogHmsAPIHelper {
     }
     TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(catalog,
         catalogReq.build(), dbName, tblName,
-        "processing " + CatalogMetastoreServiceHandler.GET_PARTITION_BY_EXPR);
+        HmsApiNameEnum.GET_PARTITION_BY_EXPR.apiName());
     checkCondition(response.table_info.hms_table.getPartitionKeys() != null,
         "%s is not a partitioned table", tableName);
     // create a mapping of the Partition name to the Partition so that we can return the
@@ -377,7 +379,7 @@ public class CatalogHmsAPIHelper {
     // when HIVE-24662 is resolved, we may not need the table id in this request.
     TGetPartialCatalogObjectResponse response = getPartialCatalogObjResponse(catalog,
         requestBuilder.build(), dbName, tblName,
-        "processing " + CatalogMetastoreServiceHandler.GET_PARTITION_BY_NAMES);
+        HmsApiNameEnum.GET_PARTITION_BY_NAMES.apiName());
     checkCondition(response.table_info.hms_table.getPartitionKeys() != null,
         "%s.%s is not a partitioned table", dbName, tblName);
     checkCondition(
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 67bdb26..9a6bbd5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -64,7 +64,9 @@ import org.apache.impala.catalog.events.NoOpEventProcessor;
 import org.apache.impala.catalog.events.SelfEventContext;
 import org.apache.impala.catalog.monitor.CatalogMonitor;
 import org.apache.impala.catalog.monitor.CatalogTableMetrics;
+import org.apache.impala.catalog.monitor.CatalogMonitor;
 import org.apache.impala.catalog.metastore.CatalogMetastoreServer;
+import org.apache.impala.catalog.metastore.HmsApiNameEnum;
 import org.apache.impala.catalog.metastore.ICatalogMetastoreServer;
 import org.apache.impala.catalog.metastore.NoOpCatalogMetastoreServer;
 import org.apache.impala.common.FileSystemUtil;
@@ -78,6 +80,7 @@ import org.apache.impala.service.FeSupport;
 import org.apache.impala.thrift.CatalogLookupStatus;
 import org.apache.impala.thrift.CatalogServiceConstants;
 import org.apache.impala.thrift.TCatalog;
+import org.apache.impala.thrift.TCatalogdHmsCacheMetrics;
 import org.apache.impala.thrift.TCatalogInfoSelector;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
@@ -2209,8 +2212,30 @@ public class CatalogServiceCatalog extends Catalog {
       // the ValidWriteIdList only when the table id matches.
       if (tbl instanceof HdfsTable
           && AcidUtils.compare((HdfsTable) tbl, validWriteIdList, tableId) >= 0) {
+        CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+            .getCounter(CatalogMetastoreServer.CATALOGD_CACHE_HIT_METRIC)
+            .inc();
+        // Update the cache stats for a HMS API from which the current method got invoked.
+        if (HmsApiNameEnum.contains(reason)) {
+          CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+              .getCounter(String
+                  .format(CatalogMetastoreServer.CATALOGD_CACHE_API_HIT_METRIC, reason))
+              .inc();
+        }
         return tbl;
       }
+      CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+          .getCounter(CatalogMetastoreServer.CATALOGD_CACHE_MISS_METRIC)
+          .inc();
+      // Update the cache stats for a HMS API from which the current method got invoked.
+      if (HmsApiNameEnum.contains(reason)) {
+        // Update the cache miss metric, as the valid write id list did not match and we
+        // have to reload the table.
+        CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+            .getCounter(String
+                .format(CatalogMetastoreServer.CATALOGD_CACHE_API_MISS_METRIC, reason))
+            .inc();
+      }
       previousCatalogVersion = tbl.getCatalogVersion();
       loadReq = tableLoadingMgr_.loadAsync(tableName, reason);
     } finally {
@@ -3303,6 +3328,13 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * Gets the Catalogd HMS cache metrics. Used for publishing metrics on the webUI.
+   */
+  public TCatalogdHmsCacheMetrics getCatalogdHmsCacheMetrics() {
+    return catalogMetastoreServer_.getCatalogdHmsCacheMetrics();
+  }
+
+  /**
    * Gets the events processor summary. Used for populating the contents of the events
    * processor detailed view page
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java
index 3414f4d..f120478 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServer.java
@@ -17,9 +17,18 @@
 
 package org.apache.impala.catalog.metastore;
 
+import java.util.ArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import com.codahale.metrics.Timer;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -27,11 +36,17 @@ import java.lang.reflect.Proxy;
 import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive;
 import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.metastore.HmsApiNameEnum;
+import org.apache.impala.catalog.monitor.CatalogMonitor;
 import org.apache.impala.common.Metrics;
+import org.apache.impala.thrift.TCatalogdHmsCacheMetrics;
+import org.apache.impala.thrift.TCatalogHmsCacheApiMetrics;
 import org.apache.impala.service.BackendConfig;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -80,12 +95,29 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
   // will be blocked until a server thread is closed.
   private static final int MAX_SERVER_THREADS = 500;
 
+  // Metrics for CatalogD HMS cache
   private static final String ACTIVE_CONNECTIONS_METRIC = "metastore.active.connections";
+  public static final String CATALOGD_CACHE_MISS_METRIC = "catalogd-hms-cache.miss";
+  public static final String CATALOGD_CACHE_HIT_METRIC = "catalogd-hms-cache.hit";
+  public static final String CATALOGD_CACHE_API_REQUESTS_METRIC =
+      "catalogd-hms-cache.api-requests";
+
+  // CatalogD HMS Cache - API specific metrics
   private static final String RPC_DURATION_FORMAT_METRIC = "metastore.rpc.duration.%s";
+  public static final String CATALOGD_CACHE_API_MISS_METRIC =
+      "catalogd-hms-cache.cache-miss.api.%s";
+  public static final String CATALOGD_CACHE_API_HIT_METRIC =
+      "catalogd-hms-cache.cache-hit.api.%s";
+
+  public static final Set<String> apiNamesSet_ = new HashSet<>();
 
   // flag to indicate if the server is started or not
   private final AtomicBoolean started_ = new AtomicBoolean(false);
 
+  // Logs Catalogd HMS cache metrics at a fixed frequency.
+  private final ScheduledExecutorService metricsLoggerService_ =
+      Executors.newScheduledThreadPool(1);
+
   // the server is started in a daemon thread so that instantiating this is not
   // a blocking call.
   private CompletableFuture<Void> serverHandle_;
@@ -93,13 +125,21 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
   // reference to the catalog Service catalog object
   private final CatalogServiceCatalog catalog_;
 
-  // Metrics for this Metastore server. Also this instance is passed to the
-  // TServerEventHandler when the server is started so that RPC metrics can be registered.
-  private final Metrics metrics_ = new Metrics();
-
   public CatalogMetastoreServer(CatalogServiceCatalog catalogServiceCatalog) {
     Preconditions.checkNotNull(catalogServiceCatalog);
     catalog_ = catalogServiceCatalog;
+    initMetrics();
+  }
+
+  private void initMetrics() {
+    CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+        .addCounter(CATALOGD_CACHE_MISS_METRIC);
+    CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+        .addCounter(CATALOGD_CACHE_HIT_METRIC);
+    CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+        .addMeter(CATALOGD_CACHE_API_REQUESTS_METRIC);
+    metricsLoggerService_.scheduleAtFixedRate(
+        new MetricsLogger(this), 0, 1, TimeUnit.MINUTES);
   }
 
   /**
@@ -112,14 +152,16 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
 
     @Override
     public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
-      metrics_.getCounter(ACTIVE_CONNECTIONS_METRIC).inc();
+      CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+          .getCounter(ACTIVE_CONNECTIONS_METRIC).inc();
       return null;
     }
 
     @Override
     public void deleteContext(ServerContext serverContext, TProtocol tProtocol,
         TProtocol tProtocol1) {
-      metrics_.getCounter(ACTIVE_CONNECTIONS_METRIC).dec();
+      CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+          .getCounter(ACTIVE_CONNECTIONS_METRIC).dec();
     }
 
     @Override
@@ -152,19 +194,46 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
      */
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
+      synchronized (apiNamesSet_) {
+        // we synchronize on apiNamesSet_ because the metrics logger thread can be
+        // reading it at the same time.
+        apiNamesSet_.add(method.getName());
+      }
+      CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+          .getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
+          .mark();
       Timer.Context context =
-          metrics_.getTimer(String.format(RPC_DURATION_FORMAT_METRIC, method.getName()))
+          CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+              .getTimer(String.format(RPC_DURATION_FORMAT_METRIC,
+                  method.getName()) +
+                  Thread.currentThread().getId())
               .time();
+      if (CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+          .getCounter(String.format(CATALOGD_CACHE_API_MISS_METRIC,
+              method.getName())) == null) {
+        CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+            .addCounter(String.format(CATALOGD_CACHE_API_MISS_METRIC,
+                method.getName()));
+        CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+            .addCounter(String.format(CATALOGD_CACHE_API_HIT_METRIC,
+                method.getName()));
+      }
+
       try {
         LOG.debug("Invoking HMS API: {}", method.getName());
         return method.invoke(handler_, args);
       } catch (Exception ex) {
         Throwable unwrapped = unwrap(ex);
-        LOG.error("Received exception while executing " + method.getName() + " : ",
+        LOG.error("Received exception while executing "
+                + method.getName() + " : ",
             unwrapped);
         throw unwrapped;
       } finally {
-        context.stop();
+        long elapsedTime = TimeUnit.NANOSECONDS.toMillis(context.stop());
+        CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+            .getTimer(String.format(RPC_DURATION_FORMAT_METRIC,
+                method.getName()))
+            .update(elapsedTime, TimeUnit.MILLISECONDS);
       }
     }
 
@@ -187,6 +256,28 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
   }
 
   /**
+   * Runnable task which logs the current HMS cache metrics. This is scheduled by
+   * {@link CatalogMetastoreServer} so that we print the metrics of the APIs which are
+   * called regularly in the log. The metrics are logged only at debug level currently
+   * so this is useful for mostly debugging purposes currently.
+   * TODO Remove this and expose the metrics in the catalogd's debug UI.
+   */
+  private static class MetricsLogger implements Runnable {
+
+    private final CatalogMetastoreServer server_;
+
+    public MetricsLogger(CatalogMetastoreServer server) {
+      this.server_ = server;
+    }
+
+    @Override
+    public void run() {
+      TCatalogdHmsCacheMetrics metrics = server_.getCatalogdHmsCacheMetrics();
+      LOG.debug("CatalogdHMSCacheMetrics : {}", metrics.toString());
+    }
+  }
+
+  /**
    * Starts the thrift server in a background thread and the configured port. Currently,
    * only support NOSASL mode. TODO Add SASL and ssl support (IMPALA-10638)
    *
@@ -198,7 +289,7 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
     Preconditions.checkState(!started_.get(), "Metastore server is already started");
     LOG.info("Starting the Metastore server at port number {}", portNumber);
     CatalogMetastoreServiceHandler handler =
-        new CatalogMetastoreServiceHandler(catalog_, metrics_,
+        new CatalogMetastoreServiceHandler(catalog_,
             BackendConfig.INSTANCE.fallbackToHMSOnErrors());
     // create a proxy class for the ThriftMetastore.Iface and ICatalogMetastoreServer
     // so that all the APIs can be invoked via a TimingInvocationHandler
@@ -268,12 +359,122 @@ public class CatalogMetastoreServer extends ThriftHiveMetastore implements
   }
 
   /**
+<<<<<<< HEAD
    * Returns the RPC and connection metrics for this metastore server. //TODO hook this
    * method to the Catalog's debug UI
+=======
+   * Returns the RPC and connection metrics for this metastore server.
+>>>>>>> c4a8633759... IMPALA-10645: Log catalogd HMS API metrics
    */
   @Override
-  public String getMetrics() {
-    return metrics_.toString();
+  public TCatalogdHmsCacheMetrics getCatalogdHmsCacheMetrics() {
+    long apiRequests = CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+        .getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
+        .getCount();
+    double cacheHitRatio =
+        getHitRatio(CATALOGD_CACHE_HIT_METRIC, CATALOGD_CACHE_MISS_METRIC);
+    double apiRequestsOneMinute =
+        CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+            .getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
+            .getOneMinuteRate();
+    double apiRequestsFiveMinutes =
+        CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+            .getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
+            .getFiveMinuteRate();
+    double apiRequestsFifteenMinutes =
+        CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+            .getMeter(CATALOGD_CACHE_API_REQUESTS_METRIC)
+            .getFifteenMinuteRate();
+
+    TCatalogdHmsCacheMetrics catalogdHmsCacheMetrics = new TCatalogdHmsCacheMetrics();
+
+    List<TCatalogHmsCacheApiMetrics> apiMetricsList = new ArrayList<>();
+    catalogdHmsCacheMetrics.setApi_metrics(apiMetricsList);
+
+    catalogdHmsCacheMetrics.setCache_hit_ratio(cacheHitRatio);
+    catalogdHmsCacheMetrics.setApi_requests(apiRequests);
+    catalogdHmsCacheMetrics.setApi_requests_1min_rate(apiRequestsOneMinute);
+    catalogdHmsCacheMetrics.setApi_requests_5min_rate(apiRequestsFiveMinutes);
+    catalogdHmsCacheMetrics.setApi_requests_15min_rate(apiRequestsFifteenMinutes);
+
+    HashSet<String> apiNames;
+    synchronized (apiNamesSet_) {
+      // we synchronize apiNamesSet_ here because a concurrent invoke() method could
+      // be modifying it at the same time.
+      apiNames = new HashSet<>(apiNamesSet_);
+    }
+    for (String apiName : apiNames) {
+      TCatalogHmsCacheApiMetrics apiMetrics = new TCatalogHmsCacheApiMetrics();
+      apiMetricsList.add(apiMetrics);
+      double specificApiP95ResponseTime =
+          CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+              .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+              .getSnapshot()
+              .get95thPercentile();
+      double specificApiP99ResponseTime =
+          CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+              .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+              .getSnapshot()
+              .get99thPercentile();
+      long specificApiRequests =
+          CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+              .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+              .getCount();
+      // we collect the cache hit ratio metrics only for the APIs which we serve from
+      // catalogd server.
+      if (HmsApiNameEnum.contains(apiName)) {
+        double specificApiCacheHitRatio =
+            getHitRatio(String.format(CATALOGD_CACHE_API_HIT_METRIC, apiName),
+                String.format(CATALOGD_CACHE_API_MISS_METRIC, apiName));
+        apiMetrics.setCache_hit_ratio(specificApiCacheHitRatio);
+      }
+      double specificApiRequestsOneMinute =
+          CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+              .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+              .getOneMinuteRate();
+      double specificApiRequestsFiveMinutes =
+          CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+              .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+              .getFiveMinuteRate();
+      double specificApiRequestsFifteenMinutes =
+          CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+              .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+              .getFifteenMinuteRate();
+      long max = CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+          .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+          .getSnapshot()
+          .getMax();
+      long min = CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+          .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+          .getSnapshot()
+          .getMin();
+      double mean = CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+          .getTimer(String.format(RPC_DURATION_FORMAT_METRIC, apiName))
+          .getSnapshot()
+          .getMean();
+      apiMetrics.setApi_name(apiName);
+      apiMetrics.setApi_requests(specificApiRequests);
+      apiMetrics.setP99_response_time_ms(specificApiP99ResponseTime);
+      apiMetrics.setP95_response_time_ms(specificApiP95ResponseTime);
+      apiMetrics.setResponse_time_mean_ms(mean);
+      apiMetrics.setResponse_time_max_ms(max);
+      apiMetrics.setResponse_time_min_ms(min);
+      apiMetrics.setApi_requests_1min_rate(specificApiRequestsOneMinute);
+      apiMetrics.setApi_requests_5min_rate(specificApiRequestsFiveMinutes);
+      apiMetrics.setApi_requests_15min_rate(specificApiRequestsFifteenMinutes);
+    }
+    return catalogdHmsCacheMetrics;
+  }
+
+  /**
+   * Returns the hit ratio given the metric names for the hits and misses.
+   */
+  private double getHitRatio(String hitMetric, String missMetric) {
+    long hitCount = CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+        .getCounter(hitMetric).getCount();
+    long missCount = CatalogMonitor.INSTANCE.getCatalogdHmsCacheMetrics()
+        .getCounter(missMetric).getCount();
+    return ((double) hitCount) / (hitCount + missCount);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
index be00d69..a192c7c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/CatalogMetastoreServiceHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.catalog.metastore;
 
+import static org.apache.impala.catalog.metastore.HmsApiNameEnum.GET_PARTITION_BY_NAMES;
+
 import org.apache.hadoop.hive.metastore.api.GetFieldsRequest;
 import org.apache.hadoop.hive.metastore.api.GetFieldsResponse;
 import org.apache.hadoop.hive.metastore.api.GetPartitionsByNamesRequest;
@@ -45,6 +47,8 @@ import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogHmsAPIHelper;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.common.Metrics;
+import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
+import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.service.BackendConfig;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
@@ -61,9 +65,9 @@ public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler {
   private static final Logger LOG = LoggerFactory
       .getLogger(CatalogMetastoreServiceHandler.class);
 
-  public CatalogMetastoreServiceHandler(CatalogServiceCatalog catalog, Metrics metrics,
+  public CatalogMetastoreServiceHandler(CatalogServiceCatalog catalog,
       boolean fallBackToHMSOnErrors) {
-    super(catalog, metrics, fallBackToHMSOnErrors);
+    super(catalog, fallBackToHMSOnErrors);
   }
 
   @Override
@@ -113,11 +117,13 @@ public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler {
       }
     } catch (Exception e) {
       // we catch the CatalogException and fall-back to HMS
-      throwIfNoFallback(e, GET_PARTITION_BY_EXPR);
+      throwIfNoFallback(e, HmsApiNameEnum.GET_PARTITION_BY_EXPR.apiName());
     }
     String tblName =
         partitionsByExprRequest.getDbName() + "." + partitionsByExprRequest.getTblName();
-    LOG.info(String.format(HMS_FALLBACK_MSG_FORMAT, GET_PARTITION_BY_EXPR, tblName));
+    LOG.info(String
+        .format(HMS_FALLBACK_MSG_FORMAT, HmsApiNameEnum.GET_PARTITION_BY_EXPR.apiName(),
+            tblName));
     return super.get_partitions_by_expr(partitionsByExprRequest);
   }
 
@@ -141,7 +147,7 @@ public class CatalogMetastoreServiceHandler extends MetastoreServiceHandler {
       return CatalogHmsAPIHelper
           .getPartitionsByNames(catalog_, serverConf_, getPartitionsByNamesRequest);
     } catch (Exception ex) {
-      throwIfNoFallback(ex, GET_PARTITION_BY_NAMES);
+      throwIfNoFallback(ex, GET_PARTITION_BY_NAMES.apiName());
     }
     String tblName =
         getPartitionsByNamesRequest.getDb_name() + "." + getPartitionsByNamesRequest
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/HmsApiNameEnum.java
similarity index 57%
copy from fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
copy to fe/src/main/java/org/apache/impala/catalog/metastore/HmsApiNameEnum.java
index 399ed09..eeb3357 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/HmsApiNameEnum.java
@@ -17,28 +17,31 @@
 
 package org.apache.impala.catalog.metastore;
 
-import org.apache.impala.catalog.CatalogException;
-
 /**
- * NoOpCatalogMetastoreServer which is used when metastore service is not configured.
+ * HmsApiNameEnum has list of names of HMS APIs that will be served from CatalogD HMS
+ * cache, if CatalogD caching is enabled.
  */
-public class NoOpCatalogMetastoreServer implements ICatalogMetastoreServer {
-  public static final NoOpCatalogMetastoreServer INSTANCE =
-      new NoOpCatalogMetastoreServer();
-  private static final String EMPTY = "";
+public enum HmsApiNameEnum {
+  GET_TABLE_REQ("get_table_req"),
+  GET_PARTITION_BY_EXPR("get_partitions_by_expr"),
+  GET_PARTITION_BY_NAMES("get_partitions_by_names_req");
+
+  private final String apiName;
 
-  @Override
-  public void start() throws CatalogException {
-    // no-op
+  HmsApiNameEnum(String apiName) {
+    this.apiName = apiName;
   }
 
-  @Override
-  public void stop() throws CatalogException {
-    // no-op
+  public String apiName() {
+    return apiName;
   }
 
-  @Override
-  public String getMetrics() {
-    return EMPTY;
+  public static boolean contains(String apiName) {
+    for (HmsApiNameEnum api : HmsApiNameEnum.values()) {
+      if (api.name().equals(apiName)) {
+        return true;
+      }
+    }
+    return false;
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java
index e5aa7a9..c1ecda0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/ICatalogMetastoreServer.java
@@ -18,6 +18,7 @@
 package org.apache.impala.catalog.metastore;
 
 import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.thrift.TCatalogdHmsCacheMetrics;
 
 /**
  * This is the main Interface which a CatalogMetastore service should implement. It
@@ -39,9 +40,8 @@ public interface ICatalogMetastoreServer {
   void stop() throws CatalogException;
 
   /**
-   * Returns the metrics for this Catalog Metastore service.
-   * @return Encoded String (eg. Json format) representation of all the metrics for this
-   * metastore service.
+   * Returns the metrics for this CatalogD HMS Cache.
+   * @return TCatalogdHmsCacheMetrics.
    */
-  String getMetrics();
+  TCatalogdHmsCacheMetrics getCatalogdHmsCacheMetrics();
 }
\ No newline at end of file
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
index 9a1b6ba..13ad92f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/MetastoreServiceHandler.java
@@ -262,7 +262,6 @@ import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.impala.catalog.CatalogHmsAPIHelper;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
-import org.apache.impala.common.Metrics;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.util.AcidUtils;
 import org.apache.thrift.TException;
@@ -293,20 +292,16 @@ public abstract class MetastoreServiceHandler implements Iface {
       + "table %s to the backing HiveMetastore service";
 
   // constant used for logging error messages
-  public static final String GET_PARTITION_BY_EXPR = "get_partitions_by_expr";
-  public static final String GET_PARTITION_BY_NAMES = "get_partitions_by_names_req";
   protected final CatalogServiceCatalog catalog_;
   protected final boolean fallBackToHMSOnErrors_;
-  protected final Metrics metrics_;
   // TODO handle session configuration
   protected Configuration serverConf_;
   protected PartitionExpressionProxy expressionProxy_;
   protected final String defaultCatalogName_;
 
-  public MetastoreServiceHandler(CatalogServiceCatalog catalog, Metrics metrics,
+  public MetastoreServiceHandler(CatalogServiceCatalog catalog,
       boolean fallBackToHMSOnErrors) {
     catalog_ = Preconditions.checkNotNull(catalog);
-    metrics_ = Preconditions.checkNotNull(metrics);
     fallBackToHMSOnErrors_ = fallBackToHMSOnErrors;
     LOG.info("Fallback to hive metastore service on errors is {}",
         fallBackToHMSOnErrors_);
@@ -1306,7 +1301,9 @@ public abstract class MetastoreServiceHandler implements Iface {
     String tblName =
         getPartitionsByNamesRequest.getDb_name() + "." + getPartitionsByNamesRequest
             .getTbl_name();
-    LOG.info(String.format(HMS_FALLBACK_MSG_FORMAT, GET_PARTITION_BY_NAMES, tblName));
+    LOG.info(String
+        .format(HMS_FALLBACK_MSG_FORMAT, HmsApiNameEnum.GET_PARTITION_BY_NAMES.apiName(),
+            tblName));
     boolean getFileMetadata = getPartitionsByNamesRequest.isGetFileMetadata();
     GetPartitionsByNamesResult result;
     ValidWriteIdList validWriteIdList = null;
diff --git a/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java b/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
index 399ed09..c2433a0 100644
--- a/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
+++ b/fe/src/main/java/org/apache/impala/catalog/metastore/NoOpCatalogMetastoreServer.java
@@ -18,6 +18,7 @@
 package org.apache.impala.catalog.metastore;
 
 import org.apache.impala.catalog.CatalogException;
+import org.apache.impala.thrift.TCatalogdHmsCacheMetrics;
 
 /**
  * NoOpCatalogMetastoreServer which is used when metastore service is not configured.
@@ -38,7 +39,7 @@ public class NoOpCatalogMetastoreServer implements ICatalogMetastoreServer {
   }
 
   @Override
-  public String getMetrics() {
-    return EMPTY;
+  public TCatalogdHmsCacheMetrics getCatalogdHmsCacheMetrics() {
+    return new TCatalogdHmsCacheMetrics();
   }
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java
index 7454f61..488f669 100644
--- a/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/monitor/CatalogMonitor.java
@@ -17,6 +17,8 @@
 
 package org.apache.impala.catalog.monitor;
 
+import org.apache.impala.common.Metrics;
+
 /**
  * Singleton class that helps monitor catalog, monitoring classes can be
  * accessed through the CatalogMonitor.
@@ -28,6 +30,8 @@ public final class CatalogMonitor {
 
   private final CatalogOperationMetrics catalogOperationUsage_;
 
+  private final Metrics catalogdHmsCacheMetrics_ = new Metrics();
+
   private CatalogMonitor() {
     catalogTableMetrics_ = CatalogTableMetrics.INSTANCE;
     catalogOperationUsage_ = CatalogOperationMetrics.INSTANCE;
@@ -38,4 +42,8 @@ public final class CatalogMonitor {
   public CatalogOperationMetrics getCatalogOperationMetrics() {
     return catalogOperationUsage_;
   }
+
+  public Metrics getCatalogdHmsCacheMetrics() {
+    return catalogdHmsCacheMetrics_;
+  }
 }

[impala] 02/03: IMPALA-10678: Support custom SASL protocol name in Kudu client

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 9cc4f5e2b97ee5c06f70d28b6f64ca8ff0194059
Author: wzhou-code <wz...@cloudera.com>
AuthorDate: Mon May 10 16:05:52 2021 -0700

    IMPALA-10678: Support custom SASL protocol name in Kudu client
    
    This patch added configurable flag variable kudu_sasl_protocol_name,
    and call Kudu client API to set the SASL protocol name when creating
    Kudu client in the FE and BE.
    Upgraded toolchain to pull in new version of Kudu which provides new
    Java/C++ client APIs for setting the SASL protocol name.
    
    Testing:
     - Passed core run.
    
    Change-Id: I0fb0b50f5e42e8a720564e51ad6c6185b51e3647
    Reviewed-on: http://gerrit.cloudera.org:8080/17442
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/common/global-flags.cc                                 | 4 ++++
 be/src/exec/kudu-util.cc                                      | 2 ++
 be/src/util/backend-gflag-util.cc                             | 2 ++
 bin/impala-config.sh                                          | 4 ++--
 common/thrift/BackendGflags.thrift                            | 2 ++
 fe/src/main/java/org/apache/impala/service/BackendConfig.java | 2 ++
 fe/src/main/java/org/apache/impala/util/KuduUtil.java         | 1 +
 7 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index 5a4ca3c..098562b 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -205,6 +205,10 @@ DEFINE_int32(kudu_client_connection_negotiation_timeout_ms, 3000,
     "(Advanced) Timeout for connection negotiation between Kudu client and "
     "Kudu masters and tablet servers, in milliseconds");
 
+// SASL protocol name for Kudu used in the FE and BE when creating Kudu client.
+// The default name is "kudu".
+DEFINE_string(kudu_sasl_protocol_name, "kudu", "SASL protocol name for Kudu");
+
 DEFINE_int64(inc_stats_size_limit_bytes, 200 * (1LL<<20), "Maximum size of "
     "incremental stats the catalog is allowed to serialize per table. "
     "This limit is set as a safety check, to prevent the JVM from "
diff --git a/be/src/exec/kudu-util.cc b/be/src/exec/kudu-util.cc
index a623273..b2d5368 100644
--- a/be/src/exec/kudu-util.cc
+++ b/be/src/exec/kudu-util.cc
@@ -55,6 +55,7 @@ DEFINE_int32(kudu_client_v, -1,
 DECLARE_bool(disable_kudu);
 DECLARE_int32(kudu_client_rpc_timeout_ms);
 DECLARE_int32(kudu_client_connection_negotiation_timeout_ms);
+DECLARE_string(kudu_sasl_protocol_name);
 
 namespace impala {
 
@@ -97,6 +98,7 @@ Status CreateKuduClient(const vector<string>& master_addrs,
     LOG(WARNING) << "Ignoring value of --kudu_client_num_reactor_threads: "
                  << FLAGS_kudu_client_num_reactor_threads;
   }
+  b.sasl_protocol_name(FLAGS_kudu_sasl_protocol_name);
   KUDU_RETURN_IF_ERROR(b.Build(client), "Unable to create Kudu client");
   return Status::OK();
 }
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index a15b633..de87481 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -95,6 +95,7 @@ DECLARE_bool(start_hms_server);
 DECLARE_int32(hms_port);
 DECLARE_bool(fallback_to_hms_on_errors);
 DECLARE_bool(enable_catalogd_hms_cache);
+DECLARE_string(kudu_sasl_protocol_name);
 
 // HS2 SAML2.0 configuration
 // Defined here because TAG_FLAG caused issues in global-flags.cc
@@ -293,6 +294,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_hms_port(FLAGS_hms_port);
   cfg.__set_fallback_to_hms_on_errors(FLAGS_fallback_to_hms_on_errors);
   cfg.__set_enable_catalogd_hms_cache(FLAGS_enable_catalogd_hms_cache);
+  cfg.__set_kudu_sasl_protocol_name(FLAGS_kudu_sasl_protocol_name);
   return Status::OK();
 }
 
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index b965d6c..662aec2 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -68,7 +68,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=15-4a1bf8d9bc
+export IMPALA_TOOLCHAIN_BUILD_ID=20-87becfe485
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p5
@@ -666,7 +666,7 @@ fi
 # overall build type) and does not apply when using a local Kudu build.
 export USE_KUDU_DEBUG_BUILD=${USE_KUDU_DEBUG_BUILD-false}
 
-export IMPALA_KUDU_VERSION=${IMPALA_KUDU_VERSION-"b5e7362e69"}
+export IMPALA_KUDU_VERSION=${IMPALA_KUDU_VERSION-"f486f0813a"}
 export IMPALA_KUDU_HOME=${IMPALA_TOOLCHAIN_PACKAGES_HOME}/kudu-$IMPALA_KUDU_VERSION
 export IMPALA_KUDU_JAVA_HOME=\
 ${IMPALA_TOOLCHAIN_PACKAGES_HOME}/kudu-${IMPALA_KUDU_VERSION}/java
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index c483469..a2e5680 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -205,4 +205,6 @@ struct TBackendGflags {
   90: required bool fallback_to_hms_on_errors
 
   91: required bool enable_catalogd_hms_cache
+
+  92: required string kudu_sasl_protocol_name
 }
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 7ae4dc7..8b0525c 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -74,6 +74,8 @@ public class BackendConfig {
   }
   public int getKuduClientTimeoutMs() { return backendCfg_.kudu_operation_timeout_ms; }
 
+  public String getKuduSaslProtocolName() { return backendCfg_.kudu_sasl_protocol_name; }
+
   public String getImpalaBuildVersion() { return backendCfg_.impala_build_version; }
   public int getImpalaLogLevel() { return backendCfg_.impala_log_lvl; }
   public int getNonImpalaJavaVlogLevel() { return backendCfg_.non_impala_java_vlog; }
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index 7568623..4ddb734 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -93,6 +93,7 @@ public class KuduUtil {
       b.defaultAdminOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
       b.defaultOperationTimeoutMs(BackendConfig.INSTANCE.getKuduClientTimeoutMs());
       b.workerCount(KUDU_CLIENT_WORKER_THREAD_COUNT);
+      b.saslProtocolName(BackendConfig.INSTANCE.getKuduSaslProtocolName());
       client = b.build();
       kuduClients_.put(kuduMasters, client);
     }

[impala] 03/03: IMPALA-10695: add dedicated thread pool for OSS/JindoFS.

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 26eaa43dbe6f99a6f97f4d3e0eafb1cf7c24217c
Author: Yong Yang <yo...@163.com>
AuthorDate: Fri May 14 10:40:03 2021 +0000

    IMPALA-10695: add dedicated thread pool for OSS/JindoFS.
    
    OSS is the object store in Alibaba cloud, just like s3a,
    and jindofs is a gateway based on Alibaba cloud object store.
    The following is about the JindoFS, for more information:
    https://www.alibabacloud.com/blog/introducing-jindofs-a-high-performance-data-lake-storage-solution_595600
    If Alibaba object store would be treated as local disk
     without this change, the query performance is not good.
    This change would create a dedicated queue for this kind of target,
     and improved the OSS scan performance.
    I have tested it in our environment,
     and observed at least double the scan speed.
    
    New flags:
     - num_oss_io_threads: Number of OSS/JindoFS I/O threads. Defaults to 16.
    
    Change-Id: I4643105628f3860e3145c85d9ed205fe20291add
    Signed-off-by: Yong Yang <yo...@163.com>
    Reviewed-on: http://gerrit.cloudera.org:8080/17455
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/io/disk-io-mgr-test.cc | 4 +++-
 be/src/runtime/io/disk-io-mgr.cc      | 8 ++++++++
 be/src/runtime/io/disk-io-mgr.h       | 4 ++++
 be/src/util/hdfs-util.cc              | 7 +++++++
 be/src/util/hdfs-util.h               | 3 +++
 5 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 540a444..d4421fa 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -54,6 +54,7 @@ DECLARE_int32(num_adls_io_threads);
 DECLARE_int32(num_abfs_io_threads);
 DECLARE_int32(num_gcs_io_threads);
 DECLARE_int32(num_ozone_io_threads);
+DECLARE_int32(num_oss_io_threads);
 DECLARE_int32(num_remote_hdfs_file_oper_io_threads);
 DECLARE_int32(num_s3_file_oper_io_threads);
 
@@ -1716,7 +1717,8 @@ TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);
   const int num_io_threads_for_remote_disks = FLAGS_num_remote_hdfs_io_threads
       + FLAGS_num_s3_io_threads + FLAGS_num_adls_io_threads + FLAGS_num_abfs_io_threads
-      + FLAGS_num_ozone_io_threads + FLAGS_num_remote_hdfs_file_oper_io_threads
+      + FLAGS_num_ozone_io_threads + FLAGS_num_oss_io_threads
+      + FLAGS_num_remote_hdfs_file_oper_io_threads
       + FLAGS_num_s3_file_oper_io_threads + FLAGS_num_gcs_io_threads;
 
   // Verify num_io_threads_per_rotational_disk and num_io_threads_per_solid_state_disk.
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index a8ced4a..05f0ff4 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -124,6 +124,9 @@ DEFINE_int32(num_s3_file_oper_io_threads, 16, "Number of S3 file operations I/O
 // The maximum number of ABFS I/O threads. TODO: choose the default empirically.
 DEFINE_int32(num_abfs_io_threads, 16, "Number of ABFS I/O threads");
 
+// The maximum number of OSS/JindoFS I/O threads. TODO: choose the default empirically.
+DEFINE_int32(num_oss_io_threads, 16, "Number of OSS/JindoFS I/O threads");
+
 // The maximum number of ADLS I/O threads. This number is a good default to have for
 // clusters that may vary widely in size, due to an undocumented concurrency limit
 // enforced by ADLS for a cluster, which spans between 500-700. For smaller clusters
@@ -468,6 +471,9 @@ Status DiskIoMgr::Init() {
     } else if (i == RemoteAdlsDiskId()) {
       num_threads_per_disk = FLAGS_num_adls_io_threads;
       device_name = "ADLS remote";
+    } else if (i == RemoteOSSDiskId()) {
+      num_threads_per_disk = FLAGS_num_oss_io_threads;
+      device_name = "OSS remote";
     } else if (i == RemoteGcsDiskId()) {
       num_threads_per_disk = FLAGS_num_gcs_io_threads;
       device_name = "GCS remote";
@@ -826,6 +832,7 @@ int DiskIoMgr::AssignQueue(
     if (IsS3APath(file, check_default_fs)) return RemoteS3DiskId();
     if (IsABFSPath(file, check_default_fs)) return RemoteAbfsDiskId();
     if (IsADLSPath(file, check_default_fs)) return RemoteAdlsDiskId();
+    if (IsOSSPath(file, check_default_fs)) return RemoteOSSDiskId();
     if (IsGcsPath(file, check_default_fs)) return RemoteGcsDiskId();
     if (IsOzonePath(file, check_default_fs)) return RemoteOzoneDiskId();
   }
@@ -833,6 +840,7 @@ int DiskIoMgr::AssignQueue(
   DCHECK(!IsS3APath(file, check_default_fs)); // S3 is always remote.
   DCHECK(!IsABFSPath(file, check_default_fs)); // ABFS is always remote.
   DCHECK(!IsADLSPath(file, check_default_fs)); // ADLS is always remote.
+  DCHECK(!IsOSSPath(file, check_default_fs)); // OSS/JindoFS is always remote.
   DCHECK(!IsGcsPath(file, check_default_fs)); // GCS is always remote.
   DCHECK(!IsOzonePath(file, check_default_fs)); // Ozone is always remote.
   if (disk_id == -1) {
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index d05df42..4745a7c 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -327,6 +327,9 @@ class DiskIoMgr : public CacheLineAligned {
   /// The disk ID (and therefore disk_queues_ index) used for ADLS accesses.
   int RemoteAdlsDiskId() const { return num_local_disks() + REMOTE_ADLS_DISK_OFFSET; }
 
+  /// The disk ID (and therefore disk_queues_ index) used for OSS/JindoFS accesses.
+  int RemoteOSSDiskId() const { return num_local_disks() + REMOTE_OSS_DISK_OFFSET; }
+
   /// The disk ID (and therefore disk_queues_ index) used for GCS accesses.
   int RemoteGcsDiskId() const { return num_local_disks() + REMOTE_GCS_DISK_OFFSET; }
 
@@ -390,6 +393,7 @@ class DiskIoMgr : public CacheLineAligned {
     REMOTE_OZONE_DISK_OFFSET,
     REMOTE_DFS_DISK_FILE_OPER_OFFSET,
     REMOTE_S3_DISK_FILE_OPER_OFFSET,
+    REMOTE_OSS_DISK_OFFSET,
     REMOTE_NUM_DISKS
   };
 
diff --git a/be/src/util/hdfs-util.cc b/be/src/util/hdfs-util.cc
index e0ee54e..9b560fe 100644
--- a/be/src/util/hdfs-util.cc
+++ b/be/src/util/hdfs-util.cc
@@ -36,6 +36,8 @@ const char* FILESYS_PREFIX_ABFS_SEC = "abfss://";
 const char* FILESYS_PREFIX_ADL = "adl://";
 const char* FILESYS_PREFIX_GCS = "gs://";
 const char* FILESYS_PREFIX_OZONE = "o3fs://";
+const char* FILESYS_PREFIX_OSS = "oss://";
+const char* FILESYS_PREFIX_JINDOFS = "jfs://";
 
 string GetHdfsErrorMsg(const string& prefix, const string& file) {
   string error_msg = GetStrErrMsg();
@@ -109,6 +111,11 @@ bool IsADLSPath(const char* path, bool check_default_fs) {
   return IsSpecificPath(path, FILESYS_PREFIX_ADL, check_default_fs);
 }
 
+bool IsOSSPath(const char* path, bool check_default_fs) {
+  return IsSpecificPath(path, FILESYS_PREFIX_OSS, check_default_fs)
+      || IsSpecificPath(path, FILESYS_PREFIX_JINDOFS, check_default_fs);
+}
+
 bool IsGcsPath(const char* path, bool check_default_fs) {
   return IsSpecificPath(path, FILESYS_PREFIX_GCS, check_default_fs);
 }
diff --git a/be/src/util/hdfs-util.h b/be/src/util/hdfs-util.h
index 835db65..f150299 100644
--- a/be/src/util/hdfs-util.h
+++ b/be/src/util/hdfs-util.h
@@ -67,6 +67,9 @@ bool IsABFSPath(const char* path, bool check_default_fs = true);
 /// Returns true iff the path refers to a location on an ADL filesystem.
 bool IsADLSPath(const char* path, bool check_default_fs = true);
 
+/// Returns true iff the path refers to a location on an ADL filesystem.
+bool IsOSSPath(const char* path, bool check_default_fs = true);
+
 /// Returns true iff the path refers to a location on an GCS filesystem.
 bool IsGcsPath(const char* path, bool check_default_fs = true);