You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2018/10/08 17:12:02 UTC

[6/6] impala git commit: IMPALA-7626: Throttle catalog partial RPC requests

IMPALA-7626: Throttle catalog partial RPC requests

With more coordinators running in local catalog mode, the expected RPC
traffic on the Catalog server is higher compared to the non-local-catalog
mode. Each such RPC is handled in its own thread and consumes some
non-trivial CPU for serializing and deserializing the metadata.

With this change, the maximum number of threads performing the actual work are
capped to a certain limit at any point and the remaining requests (if any)
are blocked until the current requests are serviced or they they exceed
the configured timeout and abort. Adds the following parameters for controlling
this behavior.

--catalog_max_parallel_partial_fetch_rpc
--catalog_partial_fetch_rpc_queue_timeout_s

--catalog_partial_fetch_rpc_queue_timeout_s controls the timeout for queued
requests.

Added some basic supportability to examine the queue length via metrics.

Added a unit test to make sure the concurrent requests for this RPC
method does not exceed the configured value.

Change-Id: I11f77a16cfa38ada42d8b7c859850198ea7dd142
Reviewed-on: http://gerrit.cloudera.org:8080/11561
Reviewed-by: Vuk Ercegovac <ve...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d48ffc2d
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d48ffc2d
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d48ffc2d

Branch: refs/heads/master
Commit: d48ffc2d45b2a9d4b9c730bba5677d3096311a25
Parents: 843683e
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Sun Sep 30 23:20:34 2018 -0700
Committer: Bharath Vissapragada <bh...@cloudera.com>
Committed: Mon Oct 8 17:09:40 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                | 43 +++++++--
 be/src/catalog/catalog-server.h                 | 10 +++
 be/src/catalog/catalog.cc                       |  5 ++
 be/src/catalog/catalog.h                        |  5 ++
 be/src/util/backend-gflag-util.cc               |  6 ++
 common/thrift/BackendGflags.thrift              |  4 +
 common/thrift/JniCatalog.thrift                 |  6 ++
 common/thrift/metrics.json                      | 10 +++
 .../impala/catalog/CatalogServiceCatalog.java   | 65 +++++++++++++-
 .../apache/impala/service/BackendConfig.java    |  8 ++
 .../org/apache/impala/service/JniCatalog.java   |  9 ++
 .../impala/catalog/PartialCatalogInfoTest.java  | 93 +++++++++++++++++++-
 .../testutil/CatalogServiceTestCatalog.java     |  3 +-
 13 files changed, 255 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index d4ce559..c4ee301 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -62,6 +62,14 @@ DEFINE_validator(catalog_topic_mode, [](const char* name, const string& val) {
   return false;
 });
 
+DEFINE_int32_hidden(catalog_max_parallel_partial_fetch_rpc, 32, "Maximum number of "
+    "partial catalog object fetch RPCs that can run in parallel. Applicable only when "
+    "local catalog mode is configured.");
+
+DEFINE_int64_hidden(catalog_partial_fetch_rpc_queue_timeout_s, LLONG_MAX, "Maximum time "
+    "(in seconds) a partial catalog object fetch RPC spends in the queue waiting "
+    "to run. Must be set to a value greater than zero.");
+
 DECLARE_string(state_store_host);
 DECLARE_int32(state_store_subscriber_port);
 DECLARE_int32(state_store_port);
@@ -73,6 +81,9 @@ string CatalogServer::IMPALA_CATALOG_TOPIC = "catalog-update";
 const string CATALOG_SERVER_TOPIC_PROCESSING_TIMES =
     "catalog-server.topic-processing-time-s";
 
+const string CATALOG_SERVER_PARTIAL_FETCH_RPC_QUEUE_LEN =
+    "catalog.partial-fetch-rpc.queue-len";
+
 const string CATALOG_WEB_PAGE = "/catalog";
 const string CATALOG_TEMPLATE = "catalog.tmpl";
 const string CATALOG_OBJECT_WEB_PAGE = "/catalog_object";
@@ -80,6 +91,8 @@ const string CATALOG_OBJECT_TEMPLATE = "catalog_object.tmpl";
 const string TABLE_METRICS_WEB_PAGE = "/table_metrics";
 const string TABLE_METRICS_TEMPLATE = "table_metrics.tmpl";
 
+const int REFRESH_METRICS_INTERVAL_MS = 1000;
+
 // Implementation for the CatalogService thrift interface.
 class CatalogServiceThriftIf : public CatalogServiceIf {
  public:
@@ -218,6 +231,8 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
     catalog_objects_max_version_(0L) {
   topic_processing_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
       CATALOG_SERVER_TOPIC_PROCESSING_TIMES);
+  partial_fetch_rpc_queue_len_metric_ =
+      metrics->AddGauge(CATALOG_SERVER_PARTIAL_FETCH_RPC_QUEUE_LEN, 0);
 }
 
 Status CatalogServer::Start() {
@@ -230,13 +245,11 @@ Status CatalogServer::Start() {
 
   // This will trigger a full Catalog metadata load.
   catalog_.reset(new Catalog());
-  Status status = Thread::Create("catalog-server", "catalog-update-gathering-thread",
+  RETURN_IF_ERROR(Thread::Create("catalog-server", "catalog-update-gathering-thread",
       &CatalogServer::GatherCatalogUpdatesThread, this,
-      &catalog_update_gathering_thread_);
-  if (!status.ok()) {
-    status.AddDetail("CatalogService failed to start");
-    return status;
-  }
+      &catalog_update_gathering_thread_));
+  RETURN_IF_ERROR(Thread::Create("catalog-server", "catalog-metrics-refresh-thread",
+      &CatalogServer::RefreshMetrics, this, &catalog_metrics_refresh_thread_));
 
   statestore_subscriber_.reset(new StatestoreSubscriber(
      Substitute("catalog-server@$0", TNetworkAddressToString(server_address)),
@@ -249,7 +262,7 @@ Status CatalogServer::Start() {
   // prefix of any key. This saves a bit of network communication from the statestore
   // back to the catalog.
   string filter_prefix = "!";
-  status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC,
+  Status status = statestore_subscriber_->AddTopic(IMPALA_CATALOG_TOPIC,
       /* is_transient=*/ false, /* populate_min_subscriber_topic_version=*/ false,
       filter_prefix, cb);
   if (!status.ok()) {
@@ -328,7 +341,7 @@ void CatalogServer::UpdateCatalogTopicCallback(
 }
 
 [[noreturn]] void CatalogServer::GatherCatalogUpdatesThread() {
-  while (1) {
+  while (true) {
     unique_lock<mutex> unique_lock(catalog_lock_);
     // Protect against spurious wake-ups by checking the value of topic_updates_ready_.
     // It is only safe to continue on and update the shared pending_topic_updates_
@@ -366,6 +379,20 @@ void CatalogServer::UpdateCatalogTopicCallback(
   }
 }
 
+[[noreturn]] void CatalogServer::RefreshMetrics() {
+  while (true) {
+    SleepForMs(REFRESH_METRICS_INTERVAL_MS);
+    TGetCatalogServerMetricsResponse response;
+    Status status = catalog_->GetCatalogServerMetrics(&response);
+    if (!status.ok()) {
+      LOG(ERROR) << "Error refreshing catalog metrics: " << status.GetDetail();
+      continue;
+    }
+    partial_fetch_rpc_queue_len_metric_->SetValue(
+        response.catalog_partial_fetch_rpc_queue_len);
+  }
+}
+
 void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,
     Document* document) {
   GetCatalogUsage(document);

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 1df83a3..76ec2be 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -90,9 +90,15 @@ class CatalogServer {
   /// Metric that tracks the amount of time taken preparing a catalog update.
   StatsMetric<double>* topic_processing_time_metric_;
 
+  /// Tracks the partial fetch RPC call queue length on the Catalog server.
+  IntGauge* partial_fetch_rpc_queue_len_metric_;
+
   /// Thread that polls the catalog for any updates.
   std::unique_ptr<Thread> catalog_update_gathering_thread_;
 
+  /// Thread that periodically wakes up and refreshes certain Catalog metrics.
+  std::unique_ptr<Thread> catalog_metrics_refresh_thread_;
+
   /// Protects catalog_update_cv_, pending_topic_updates_,
   /// catalog_objects_to/from_version_, and last_sent_catalog_version.
   boost::mutex catalog_lock_;
@@ -143,6 +149,9 @@ class CatalogServer {
   /// Also, explicitly releases free memory back to the OS after each complete iteration.
   [[noreturn]] void GatherCatalogUpdatesThread();
 
+  /// Executed by the catalog_metrics_refresh_thread_. Refreshes certain catalog metrics.
+  [[noreturn]] void RefreshMetrics();
+
   /// Example output:
   /// "databases": [
   ///         {
@@ -200,6 +209,7 @@ class CatalogServer {
   /// <host>:25020/table_metrics?name=foo.bar
   void TableMetricsUrlCallback(const Webserver::ArgumentMap& args,
       rapidjson::Document* document);
+
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index a8873b7..a2d5f4b 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -67,6 +67,7 @@ Catalog::Catalog() {
     {"getCatalogDelta", "([B)[B", &get_catalog_delta_id_},
     {"getCatalogUsage", "()[B", &get_catalog_usage_id_},
     {"getCatalogVersion", "()J", &get_catalog_version_id_},
+    {"getCatalogServerMetrics", "()[B", &get_catalog_server_metrics_},
     {"prioritizeLoad", "([B)V", &prioritize_load_id_},
     {"getPartitionStats", "([B)[B", &get_partition_stats_id_},
     {"updateTableUsage", "([B)V", &update_table_usage_id_},
@@ -158,6 +159,10 @@ Status Catalog::GetCatalogUsage(TGetCatalogUsageResponse* response) {
   return JniUtil::CallJniMethod(catalog_, get_catalog_usage_id_, response);
 }
 
+Status Catalog::GetCatalogServerMetrics(TGetCatalogServerMetricsResponse* response) {
+  return JniUtil::CallJniMethod(catalog_, get_catalog_server_metrics_, response);
+}
+
 Status Catalog::GetFunctions(const TGetFunctionsRequest& request,
     TGetFunctionsResponse *response) {
   return JniUtil::CallJniMethod(catalog_, get_functions_id_, request, response);

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/be/src/catalog/catalog.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 65390a4..c703bcb 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -100,6 +100,10 @@ class Catalog {
   Status GetTableMetrics(const std::string& db, const std::string& tbl,
       std::string* metrics);
 
+  /// Returns the Catalog server metrics in the response object. Refer to
+  /// TGetCatalogServerMetricsResponse definition for details.
+  Status GetCatalogServerMetrics(TGetCatalogServerMetricsResponse* response);
+
   /// Returns the current catalog usage that includes the most frequently accessed
   /// tables as well as the tables with the highest memory requirements.
   Status GetCatalogUsage(TGetCatalogUsageResponse* response);
@@ -139,6 +143,7 @@ class Catalog {
   jmethodID get_catalog_delta_id_;  // JniCatalog.getCatalogDelta()
   jmethodID get_catalog_version_id_;  // JniCatalog.getCatalogVersion()
   jmethodID get_catalog_usage_id_; // JniCatalog.getCatalogUsage()
+  jmethodID get_catalog_server_metrics_; // JniCatalog.getCatalogServerMetrics()
   jmethodID get_dbs_id_; // JniCatalog.getDbs()
   jmethodID get_table_names_id_; // JniCatalog.getTableNames()
   jmethodID get_table_metrics_id_; // JniCatalog.getTableMetrics()

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/be/src/util/backend-gflag-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 09975a8..3760c84 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -65,6 +65,8 @@ DECLARE_double(invalidate_tables_fraction_on_memory_pressure);
 DECLARE_int32(local_catalog_max_fetch_retries);
 DECLARE_int64(kudu_scanner_thread_estimated_bytes_per_column);
 DECLARE_int64(kudu_scanner_thread_max_estimated_bytes);
+DECLARE_int32(catalog_max_parallel_partial_fetch_rpc);
+DECLARE_int64(catalog_partial_fetch_rpc_queue_timeout_s);
 
 namespace impala {
 
@@ -126,6 +128,10 @@ Status GetThriftBackendGflags(JNIEnv* jni_env, jbyteArray* cfg_bytes) {
       FLAGS_kudu_scanner_thread_estimated_bytes_per_column);
   cfg.__set_kudu_scanner_thread_max_estimated_bytes(
       FLAGS_kudu_scanner_thread_max_estimated_bytes);
+  cfg.__set_catalog_max_parallel_partial_fetch_rpc(
+      FLAGS_catalog_max_parallel_partial_fetch_rpc);
+  cfg.__set_catalog_partial_fetch_rpc_queue_timeout_s(
+      FLAGS_catalog_partial_fetch_rpc_queue_timeout_s);
   RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &cfg, cfg_bytes));
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/common/thrift/BackendGflags.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 5f971c0..531fc2c 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -103,4 +103,8 @@ struct TBackendGflags {
   38: required i64 kudu_scanner_thread_estimated_bytes_per_column
 
   39: required i64 kudu_scanner_thread_max_estimated_bytes
+
+  40: required i32 catalog_max_parallel_partial_fetch_rpc
+
+  41: required i64 catalog_partial_fetch_rpc_queue_timeout_s
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/common/thrift/JniCatalog.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 3be5f70..b936773 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -732,3 +732,9 @@ struct TCommentOnParams {
   // Name of column to alter.
   4: optional TColumnName column_name
 }
+
+// Response to GetCatalogServerMetrics() call.
+struct TGetCatalogServerMetricsResponse {
+  // Partial fetch RPC queue length.
+  1: required i32 catalog_partial_fetch_rpc_queue_len
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 72fab16..ccc5481 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1890,5 +1890,15 @@
     "units": "TIME_NS",
     "kind": "COUNTER",
     "key": "catalog.cache.total-load-time"
+  },
+  {
+    "description": "RPC queue length for partial object fetches.",
+    "contexts": [
+      "CATALOGSERVER"
+    ],
+    "label": "RPC queue length for partial object fetch requests.",
+    "units": "NONE",
+    "kind": "GAUGE",
+    "key": "catalog.partial-fetch-rpc.queue-len"
   }
 ]

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index ebc9b81..b3c714f 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -235,6 +236,17 @@ public class CatalogServiceCatalog extends Catalog {
   };
   final TopicMode topicMode_;
 
+  private final long PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S = BackendConfig.INSTANCE
+      .getCatalogPartialFetchRpcQueueTimeoutS();
+
+  private final int MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT = BackendConfig.INSTANCE
+      .getCatalogMaxParallelPartialFetchRpc();
+
+  // Controls concurrent access to doGetPartialCatalogObject() call. Limits the number
+  // of parallel requests to --catalog_max_parallel_partial_fetch_rpc.
+  private final Semaphore partialObjectFetchAccess_ =
+      new Semaphore(MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT, /*fair =*/ true);
+
   /**
    * Initialize the CatalogServiceCatalog. If 'loadInBackground' is true, table metadata
    * will be loaded in the background. 'initialHmsCnxnTimeoutSec' specifies the time (in
@@ -270,6 +282,7 @@ public class CatalogServiceCatalog extends Catalog {
         BackendConfig.INSTANCE.getBackendCfg().catalog_topic_mode.toUpperCase());
     catalogdTableInvalidator_ = CatalogdTableInvalidator.create(this,
         BackendConfig.INSTANCE);
+    Preconditions.checkState(PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S > 0);
   }
 
   // Timeout for acquiring a table lock
@@ -387,6 +400,10 @@ public class CatalogServiceCatalog extends Catalog {
     }
   }
 
+  public int getPartialFetchRpcQueueLength() {
+    return partialObjectFetchAccess_.getQueueLength();
+  }
+
   /**
    * Adds a list of cache directive IDs for the given table name. Asynchronously
    * refreshes the table metadata once all cache directives complete.
@@ -2095,13 +2112,59 @@ public class CatalogServiceCatalog extends Catalog {
   }
 
   /**
+   * A wrapper around doGetPartialCatalogObject() that controls the number of concurrent
+   * invocations.
+   */
+  public TGetPartialCatalogObjectResponse getPartialCatalogObject(
+      TGetPartialCatalogObjectRequest req) throws CatalogException {
+    try {
+      if (!partialObjectFetchAccess_.tryAcquire(1,
+          PARTIAL_FETCH_RPC_QUEUE_TIMEOUT_S, TimeUnit.SECONDS)) {
+        // Timed out trying to acquire the semaphore permit.
+        throw new CatalogException("Timed out while fetching partial object metadata. " +
+            "Please check the metric 'catalog.partial-fetch-rpc.queue-len' for the " +
+            "current queue length and consider increasing " +
+            "'catalog_partial_fetch_rpc_queue_timeout_s' and/or " +
+            "'catalog_max_parallel_partial_fetch_rpc'");
+      }
+      // Acquired the permit at this point, should be released before we exit out of
+      // this method.
+      //
+      // Is there a chance that this thread can get interrupted at this point before it
+      // enters the try block, eventually leading to the semaphore permit not
+      // getting released? It can probably happen if the JVM is already in a bad shape.
+      // In the worst case, every permit is blocked and the subsequent requests throw
+      // the timeout exception and the user can monitor the queue metric to see that it
+      // is full, so the issue should be easy to diagnose.
+      // TODO: Figure out if such a race is possible.
+      try {
+        return doGetPartialCatalogObject(req);
+      } finally {
+        partialObjectFetchAccess_.release();
+      }
+    } catch (InterruptedException e) {
+      throw new CatalogException("Error running getPartialCatalogObject(): ", e);
+    }
+  }
+
+  /**
+   * Returns the number of currently running partial RPCs.
+   */
+  @VisibleForTesting
+  public int getConcurrentPartialRpcReqCount() {
+    // Calculated based on number of currently available semaphore permits.
+    return MAX_PARALLEL_PARTIAL_FETCH_RPC_COUNT - partialObjectFetchAccess_
+        .availablePermits();
+  }
+
+  /**
    * Return a partial view of information about a given catalog object. This services
    * the CatalogdMetaProvider running on impalads when they are configured in
    * "local-catalog" mode. If required objects are not present, for example, the database
    * from which a table is requested, the types of the missing objects will be set in the
    * response's lookup_status.
    */
-  public TGetPartialCatalogObjectResponse getPartialCatalogObject(
+  private TGetPartialCatalogObjectResponse doGetPartialCatalogObject(
       TGetPartialCatalogObjectRequest req) throws CatalogException {
     TCatalogObject objectDesc = Preconditions.checkNotNull(req.object_desc,
         "missing object_desc");

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/fe/src/main/java/org/apache/impala/service/BackendConfig.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 45c75d7..d8a9bc7 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -119,6 +119,14 @@ public class BackendConfig {
     return backendCfg_.local_catalog_max_fetch_retries;
   }
 
+  public int getCatalogMaxParallelPartialFetchRpc() {
+    return backendCfg_.catalog_max_parallel_partial_fetch_rpc;
+  }
+
+  public long getCatalogPartialFetchRpcQueueTimeoutS() {
+    return backendCfg_.catalog_partial_fetch_rpc_queue_timeout_s;
+  }
+
   // Inits the auth_to_local configuration in the static KerberosName class.
   private static void initAuthToLocal() {
     // If auth_to_local is enabled, we read the configuration hadoop.security.auth_to_local

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index b393b55..2a709e8 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -46,6 +46,7 @@ import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TGetCatalogDeltaResponse;
 import org.apache.impala.thrift.TGetCatalogDeltaRequest;
+import org.apache.impala.thrift.TGetCatalogServerMetricsResponse;
 import org.apache.impala.thrift.TGetDbsParams;
 import org.apache.impala.thrift.TGetDbsResult;
 import org.apache.impala.thrift.TGetFunctionsRequest;
@@ -320,4 +321,12 @@ public class JniCatalog {
     JniUtil.deserializeThrift(protocolFactory_, thriftReq, req);
     catalog_.updateTableUsage(thriftReq);
   }
+
+  public byte[] getCatalogServerMetrics() throws ImpalaException, TException {
+    TGetCatalogServerMetricsResponse response = new TGetCatalogServerMetricsResponse();
+    response.setCatalog_partial_fetch_rpc_queue_len(
+        catalog_.getPartialFetchRpcQueueLength());
+    TSerializer serializer = new TSerializer(protocolFactory_);
+    return serializer.serialize(response);
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
index 6dba475..07d3309 100644
--- a/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/PartialCatalogInfoTest.java
@@ -23,9 +23,19 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import avro.shaded.com.google.common.collect.Lists;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.impala.common.InternalException;
+import org.apache.impala.common.RuntimeEnv;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
 import org.apache.impala.thrift.TCatalogInfoSelector;
 import org.apache.impala.thrift.TCatalogObject;
@@ -40,6 +50,8 @@ import org.apache.impala.thrift.TTableInfoSelector;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
 import org.apache.thrift.TSerializer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableList;
@@ -48,10 +60,26 @@ public class PartialCatalogInfoTest {
   private static CatalogServiceCatalog catalog_ =
       CatalogServiceTestCatalog.create();
 
+  /**
+   * A Callable wrapper around getPartialCatalogObject() call.
+   */
+  private class CallableGetPartialCatalogObjectRequest
+      implements Callable<TGetPartialCatalogObjectResponse> {
+    private final TGetPartialCatalogObjectRequest request_;
+
+    CallableGetPartialCatalogObjectRequest(TGetPartialCatalogObjectRequest request) {
+      request_ = request;
+    }
+
+    @Override
+    public TGetPartialCatalogObjectResponse call() throws Exception {
+      return sendRequest(request_);
+    }
+  }
+
   private TGetPartialCatalogObjectResponse sendRequest(
       TGetPartialCatalogObjectRequest req)
       throws CatalogException, InternalException, TException {
-    System.err.println("req: " + req);
     TGetPartialCatalogObjectResponse resp;
     resp = catalog_.getPartialCatalogObject(req);
     // Round-trip the response through serialization, so if we accidentally forgot to
@@ -59,10 +87,26 @@ public class PartialCatalogInfoTest {
     byte[] respBytes = new TSerializer().serialize(resp);
     resp.clear();
     new TDeserializer().deserialize(resp, respBytes);
-    System.err.println("resp: " + resp);
     return resp;
   }
 
+  /**
+   * Sends the same 'request' from 'requestCount' threads in parallel and waits for
+   * them to finish.
+   */
+  private void sendParallelRequests(TGetPartialCatalogObjectRequest request, int
+      requestCount) throws Exception {
+    Preconditions.checkState(requestCount > 0);
+    final ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(requestCount);
+    final List<Future<TGetPartialCatalogObjectResponse>> tasksToWaitFor =
+        Lists.newArrayList();
+    for (int i = 0; i < requestCount; ++i) {
+      tasksToWaitFor.add(threadPoolExecutor.submit(new
+          CallableGetPartialCatalogObjectRequest(request)));
+    }
+    for (Future task: tasksToWaitFor) task.get();
+  }
+
   @Test
   public void testDbList() throws Exception {
     TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
@@ -179,4 +223,49 @@ public class PartialCatalogInfoTest {
           tle.getMessage());
     }
   }
+
+  @Test
+  public void testConcurrentPartialObjectRequests() throws Exception {
+    // Create a request.
+    TGetPartialCatalogObjectRequest req = new TGetPartialCatalogObjectRequest();
+    req.object_desc = new TCatalogObject();
+    req.object_desc.setType(TCatalogObjectType.TABLE);
+    req.object_desc.table = new TTable("functional", "alltypes");
+    req.table_info_selector = new TTableInfoSelector();
+    req.table_info_selector.want_hms_table = true;
+    req.table_info_selector.want_partition_names = true;
+    req.table_info_selector.want_partition_metadata = true;
+
+    // Run 64 concurrent requests and run a tight loop in the background to make sure the
+    // concurrent request count never exceeds 32 (--catalog_partial_rpc_max_parallel_runs)
+    final AtomicBoolean requestsFinished = new AtomicBoolean(false);
+    final int maxParallelRuns = BackendConfig.INSTANCE
+        .getCatalogMaxParallelPartialFetchRpc();
+
+    // Uses a callable<Void> instead of Runnable because junit does not catch exceptions
+    // from threads other than the main thread. Callable here makes sure the exception
+    // is propagated to the main thread.
+    final Callable<Void> assertReqCount = new Callable() {
+      @Override
+      public Void call() throws Exception {
+        while (!requestsFinished.get()) {
+          int currentReqCount = catalog_.getConcurrentPartialRpcReqCount();
+          assertTrue("Invalid concurrent request count: " + currentReqCount,
+              currentReqCount <= maxParallelRuns);
+        }
+        return null;
+      }
+    };
+    Future assertThreadTask;
+    try {
+      // Assert the request count in a tight loop.
+      assertThreadTask = Executors.newSingleThreadExecutor().submit(assertReqCount);
+      sendParallelRequests(req, 64);
+    } finally {
+      requestsFinished.set(true);
+    }
+    // 5 minutes is a reasonable timeout for this test. If timed out, an exception is
+    // thrown.
+    assertThreadTask.get(5, TimeUnit.MINUTES);
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/d48ffc2d/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
index edad74e..3b61028 100644
--- a/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
+++ b/fe/src/test/java/org/apache/impala/testutil/CatalogServiceTestCatalog.java
@@ -19,10 +19,11 @@ package org.apache.impala.testutil;
 
 import org.apache.impala.authorization.SentryConfig;
 import org.apache.impala.catalog.AuthorizationPolicy;
-import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
 import org.apache.impala.common.ImpalaException;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.service.FeSupport;
+import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TUniqueId;
 
 /**