You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jr...@apache.org on 2018/01/19 21:41:36 UTC

[7/8] impala git commit: IMPALA-4886: Expose table metrics in the catalog web UI.

IMPALA-4886: Expose table metrics in the catalog web UI.

The following changes are included in this commit:
* Adds a lightweight framework for registering metrics in the JVM.
* Adds table-level metrics and enables these metrics to be exposed
through the catalog web UI.
* Adds a CatalogUsageMonitor class that monitors and reports the catalog
usage in terms of the tables with the highest memory requirements and
the tables with the highest number of metadata operations. The catalog
usage information is exposed in the /catalog page of the catalog web UI.

Change-Id: I37d407979e6d3b1a444b6b6265900b148facde9e
Reviewed-on: http://gerrit.cloudera.org:8080/8529
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3f00d10e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3f00d10e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3f00d10e

Branch: refs/heads/2.x
Commit: 3f00d10e1b6c5785990c1a73835f82e3821f839e
Parents: d8ae880
Author: Dimitris Tsirogiannis <dt...@cloudera.com>
Authored: Thu Oct 12 16:27:20 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Jan 19 09:25:01 2018 +0000

----------------------------------------------------------------------
 be/src/catalog/catalog-server.cc                | 115 +++++++-
 be/src/catalog/catalog-server.h                 |  29 ++
 be/src/catalog/catalog.cc                       |  16 ++
 be/src/catalog/catalog.h                        |  11 +
 common/thrift/CatalogObjects.thrift             |  11 +
 common/thrift/Frontend.thrift                   |  11 +
 common/thrift/JniCatalog.thrift                 |  22 ++
 fe/pom.xml                                      |   6 +
 .../java/org/apache/impala/catalog/Catalog.java |   7 +-
 .../impala/catalog/CatalogServiceCatalog.java   |  62 ++++-
 .../impala/catalog/CatalogUsageMonitor.java     |  72 +++++
 .../org/apache/impala/catalog/HBaseTable.java   |   6 +
 .../apache/impala/catalog/HdfsPartition.java    |   8 +-
 .../org/apache/impala/catalog/HdfsTable.java    | 262 ++++++++++++++-----
 .../org/apache/impala/catalog/KuduTable.java    |  51 ++--
 .../java/org/apache/impala/catalog/Table.java   |  59 +++++
 .../java/org/apache/impala/common/Metrics.java  | 149 +++++++++++
 .../impala/service/CatalogOpExecutor.java       |  14 +-
 .../org/apache/impala/service/JniCatalog.java   |  19 ++
 .../java/org/apache/impala/util/TopNCache.java  | 108 ++++++++
 .../org/apache/impala/util/TestTopNCache.java   | 130 +++++++++
 tests/webserver/test_web_pages.py               |   9 +
 www/catalog.tmpl                                | 117 ++++++++-
 www/table_metrics.tmpl                          |  23 ++
 24 files changed, 1205 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/be/src/catalog/catalog-server.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index b004b22..4bf26c0 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -55,6 +55,8 @@ const string CATALOG_WEB_PAGE = "/catalog";
 const string CATALOG_TEMPLATE = "catalog.tmpl";
 const string CATALOG_OBJECT_WEB_PAGE = "/catalog_object";
 const string CATALOG_OBJECT_TEMPLATE = "catalog_object.tmpl";
+const string TABLE_METRICS_WEB_PAGE = "/table_metrics";
+const string TABLE_METRICS_TEMPLATE = "table_metrics.tmpl";
 
 // Implementation for the CatalogService thrift interface.
 class CatalogServiceThriftIf : public CatalogServiceIf {
@@ -200,16 +202,14 @@ Status CatalogServer::Start() {
 }
 
 void CatalogServer::RegisterWebpages(Webserver* webserver) {
-  Webserver::UrlCallback catalog_callback =
-      bind<void>(mem_fn(&CatalogServer::CatalogUrlCallback), this, _1, _2);
   webserver->RegisterUrlCallback(CATALOG_WEB_PAGE, CATALOG_TEMPLATE,
-      catalog_callback);
-
-  Webserver::UrlCallback catalog_objects_callback =
-      bind<void>(mem_fn(&CatalogServer::CatalogObjectsUrlCallback), this, _1, _2);
+      [this](const auto& args, auto* doc) { this->CatalogUrlCallback(args, doc); });
   webserver->RegisterUrlCallback(CATALOG_OBJECT_WEB_PAGE, CATALOG_OBJECT_TEMPLATE,
-      catalog_objects_callback, false);
-
+      [this](const auto& args, auto* doc) { this->CatalogObjectsUrlCallback(args, doc); },
+      false);
+  webserver->RegisterUrlCallback(TABLE_METRICS_WEB_PAGE, TABLE_METRICS_TEMPLATE,
+      [this](const auto& args, auto* doc) { this->TableMetricsUrlCallback(args, doc); },
+      false);
   RegisterLogLevelCallbacks(webserver, true);
 }
 
@@ -335,11 +335,12 @@ void CatalogServer::BuildTopicUpdates(const vector<TCatalogObject>& catalog_obje
 
 void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,
     Document* document) {
+  GetCatalogUsage(document);
   TGetDbsResult get_dbs_result;
   Status status = catalog_->GetDbs(NULL, &get_dbs_result);
   if (!status.ok()) {
     Value error(status.GetDetail().c_str(), document->GetAllocator());
-      document->AddMember("error", error, document->GetAllocator());
+    document->AddMember("error", error, document->GetAllocator());
     return;
   }
   Value databases(kArrayType);
@@ -364,15 +365,76 @@ void CatalogServer::CatalogUrlCallback(const Webserver::ArgumentMap& args,
       table_obj.AddMember("fqtn", fq_name, document->GetAllocator());
       Value table_name(table.c_str(), document->GetAllocator());
       table_obj.AddMember("name", table_name, document->GetAllocator());
+      Value has_metrics;
+      has_metrics.SetBool(true);
+      table_obj.AddMember("has_metrics", has_metrics, document->GetAllocator());
       table_array.PushBack(table_obj, document->GetAllocator());
     }
     database.AddMember("num_tables", table_array.Size(), document->GetAllocator());
     database.AddMember("tables", table_array, document->GetAllocator());
+    Value has_metrics;
+    has_metrics.SetBool(true);
+    database.AddMember("has_metrics", has_metrics, document->GetAllocator());
     databases.PushBack(database, document->GetAllocator());
   }
   document->AddMember("databases", databases, document->GetAllocator());
 }
 
+void CatalogServer::GetCatalogUsage(Document* document) {
+  TGetCatalogUsageResponse catalog_usage_result;
+  Status status = catalog_->GetCatalogUsage(&catalog_usage_result);
+  if (!status.ok()) {
+    Value error(status.GetDetail().c_str(), document->GetAllocator());
+    document->AddMember("error", error, document->GetAllocator());
+    return;
+  }
+  // Collect information about the largest tables in terms of memory requirements
+  Value large_tables(kArrayType);
+  for (int i = 0; i < catalog_usage_result.large_tables.size(); ++i) {
+    Value tbl_obj(kObjectType);
+    const auto& large_table = catalog_usage_result.large_tables[i];
+    Value tbl_name(Substitute("$0.$1", large_table.table_name.db_name,
+        large_table.table_name.table_name).c_str(), document->GetAllocator());
+    tbl_obj.AddMember("name", tbl_name, document->GetAllocator());
+    DCHECK(large_table.__isset.memory_estimate_bytes);
+    Value memory_estimate(PrettyPrinter::Print(large_table.memory_estimate_bytes,
+        TUnit::BYTES).c_str(), document->GetAllocator());
+    tbl_obj.AddMember("mem_estimate", memory_estimate, document->GetAllocator());
+    large_tables.PushBack(tbl_obj, document->GetAllocator());
+  }
+  Value has_large_tables;
+  has_large_tables.SetBool(true);
+  document->AddMember("has_large_tables", has_large_tables, document->GetAllocator());
+  document->AddMember("large_tables", large_tables, document->GetAllocator());
+  Value num_large_tables;
+  num_large_tables.SetInt(catalog_usage_result.large_tables.size());
+  document->AddMember("num_large_tables", num_large_tables, document->GetAllocator());
+
+  // Collect information about the most frequently accessed tables.
+  Value frequent_tables(kArrayType);
+  for (int i = 0; i < catalog_usage_result.frequently_accessed_tables.size(); ++i) {
+    Value tbl_obj(kObjectType);
+    const auto& frequent_table = catalog_usage_result.frequently_accessed_tables[i];
+    Value tbl_name(Substitute("$0.$1", frequent_table.table_name.db_name,
+        frequent_table.table_name.table_name).c_str(), document->GetAllocator());
+    tbl_obj.AddMember("name", tbl_name, document->GetAllocator());
+    Value num_metadata_operations;
+    DCHECK(frequent_table.__isset.num_metadata_operations);
+    num_metadata_operations.SetInt64(frequent_table.num_metadata_operations);
+    tbl_obj.AddMember("num_metadata_ops", num_metadata_operations,
+        document->GetAllocator());
+    frequent_tables.PushBack(tbl_obj, document->GetAllocator());
+  }
+  Value has_frequent_tables;
+  has_frequent_tables.SetBool(true);
+  document->AddMember("has_frequent_tables", has_frequent_tables,
+      document->GetAllocator());
+  document->AddMember("frequent_tables", frequent_tables, document->GetAllocator());
+  Value num_frequent_tables;
+  num_frequent_tables.SetInt(catalog_usage_result.frequently_accessed_tables.size());
+  document->AddMember("num_frequent_tables", num_frequent_tables,
+      document->GetAllocator());
+}
 
 void CatalogServer::CatalogObjectsUrlCallback(const Webserver::ArgumentMap& args,
     Document* document) {
@@ -384,7 +446,8 @@ void CatalogServer::CatalogObjectsUrlCallback(const Webserver::ArgumentMap& args
 
     // Get the object type and name from the topic entry key
     TCatalogObject request;
-    Status status = TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request);
+    Status status =
+        TCatalogObjectFromObjectName(object_type, object_name_arg->second, &request);
 
     // Get the object and dump its contents.
     TCatalogObject result;
@@ -402,3 +465,35 @@ void CatalogServer::CatalogObjectsUrlCallback(const Webserver::ArgumentMap& args
     document->AddMember("error", error, document->GetAllocator());
   }
 }
+
+void CatalogServer::TableMetricsUrlCallback(const Webserver::ArgumentMap& args,
+    Document* document) {
+  // TODO: Enable json view of table metrics
+  Webserver::ArgumentMap::const_iterator object_name_arg = args.find("name");
+  if (object_name_arg != args.end()) {
+    // Parse the object name to extract database and table names
+    const string& full_tbl_name = object_name_arg->second;
+    int pos = full_tbl_name.find(".");
+    if (pos == string::npos || pos >= full_tbl_name.size() - 1) {
+      stringstream error_msg;
+      error_msg << "Invalid table name: " << full_tbl_name;
+      Value error(error_msg.str().c_str(), document->GetAllocator());
+      document->AddMember("error", error, document->GetAllocator());
+      return;
+    }
+    string metrics;
+    Status status = catalog_->GetTableMetrics(
+        full_tbl_name.substr(0, pos), full_tbl_name.substr(pos + 1), &metrics);
+    if (status.ok()) {
+      Value metrics_str(metrics.c_str(), document->GetAllocator());
+      document->AddMember("table_metrics", metrics_str, document->GetAllocator());
+    } else {
+      Value error(status.GetDetail().c_str(), document->GetAllocator());
+      document->AddMember("error", error, document->GetAllocator());
+    }
+  } else {
+    Value error("Please specify the value of the name parameter.",
+        document->GetAllocator());
+    document->AddMember("error", error, document->GetAllocator());
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/be/src/catalog/catalog-server.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog-server.h b/be/src/catalog/catalog-server.h
index 78a3f20..0b6b220 100644
--- a/be/src/catalog/catalog-server.h
+++ b/be/src/catalog/catalog-server.h
@@ -185,6 +185,35 @@ class CatalogServer {
   /// <host>:25020/catalog_objects?object_type=TABLE&object_name=foo.bar
   void CatalogObjectsUrlCallback(const Webserver::ArgumentMap& args,
       rapidjson::Document* document);
+
+  /// Retrieves from the FE information about the current catalog usage and populates
+  /// the /catalog debug webpage. The catalog usage includes information about the TOP-N
+  /// frequently used (in terms of number of metadata operations) tables as well as the
+  /// TOP-N tables with the highest memory requirements.
+  ///
+  /// Example output:
+  /// "large_tables": [
+  ///     {
+  ///       "name": "functional.alltypesagg",
+  ///       "mem_estimate": 212434233
+  ///     }
+  ///  ]
+  ///  "frequent_tables": [
+  ///      {
+  ///        "name": "functional.alltypestiny",
+  ///        "frequency": 10
+  ///      }
+  ///  ]
+  void GetCatalogUsage(rapidjson::Document* document);
+
+  /// Debug webpage handler that is used to dump all the registered metrics of a
+  /// table. The caller specifies the "name" parameter which is the fully
+  /// qualified table name and this function retrieves all the metrics of that
+  /// table. For example, to get the table metrics of table "bar" in database
+  /// "foo":
+  /// <host>:25020/table_metrics?name=foo.bar
+  void TableMetricsUrlCallback(const Webserver::ArgumentMap& args,
+      rapidjson::Document* document);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/be/src/catalog/catalog.cc
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.cc b/be/src/catalog/catalog.cc
index b6dd86a..d96d23e 100644
--- a/be/src/catalog/catalog.cc
+++ b/be/src/catalog/catalog.cc
@@ -58,11 +58,13 @@ Catalog::Catalog() {
     {"execDdl", "([B)[B", &exec_ddl_id_},
     {"resetMetadata", "([B)[B", &reset_metadata_id_},
     {"getTableNames", "([B)[B", &get_table_names_id_},
+    {"getTableMetrics", "([B)Ljava/lang/String;", &get_table_metrics_id_},
     {"getDbs", "([B)[B", &get_dbs_id_},
     {"getFunctions", "([B)[B", &get_functions_id_},
     {"checkUserSentryAdmin", "([B)V", &sentry_admin_check_id_},
     {"getCatalogObject", "([B)[B", &get_catalog_object_id_},
     {"getCatalogDelta", "([B)[B", &get_catalog_delta_id_},
+    {"getCatalogUsage", "()[B", &get_catalog_usage_id_},
     {"getCatalogVersion", "()J", &get_catalog_version_id_},
     {"prioritizeLoad", "([B)V", &prioritize_load_id_}};
 
@@ -131,6 +133,20 @@ Status Catalog::GetTableNames(const string& db, const string* pattern,
   return JniUtil::CallJniMethod(catalog_, get_table_names_id_, params, table_names);
 }
 
+Status Catalog::GetTableMetrics(const string& db, const string& tbl,
+    string* table_metrics) {
+  TGetTableMetricsParams params;
+  TTableName tblName;
+  tblName.__set_db_name(db);
+  tblName.__set_table_name(tbl);
+  params.__set_table_name(tblName);
+  return JniUtil::CallJniMethod(catalog_, get_table_metrics_id_, params, table_metrics);
+}
+
+Status Catalog::GetCatalogUsage(TGetCatalogUsageResponse* response) {
+  return JniUtil::CallJniMethod(catalog_, get_catalog_usage_id_, response);
+}
+
 Status Catalog::GetFunctions(const TGetFunctionsRequest& request,
     TGetFunctionsResponse *response) {
   return JniUtil::CallJniMethod(catalog_, get_functions_id_, request, response);

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/be/src/catalog/catalog.h
----------------------------------------------------------------------
diff --git a/be/src/catalog/catalog.h b/be/src/catalog/catalog.h
index 3119d60..13e4529 100644
--- a/be/src/catalog/catalog.h
+++ b/be/src/catalog/catalog.h
@@ -84,6 +84,15 @@ class Catalog {
   Status GetTableNames(const std::string& db, const std::string* pattern,
       TGetTablesResult* table_names);
 
+  /// Returns the collected metrics of a table. The response contains a
+  /// pretty-printed string representation of table metrics.
+  Status GetTableMetrics(const std::string& db, const std::string& tbl,
+      std::string* metrics);
+
+  /// Returns the current catalog usage that includes the most frequently accessed
+  /// tables as well as the tables with the highest memory requirements.
+  Status GetCatalogUsage(TGetCatalogUsageResponse* response);
+
   /// Gets all functions in the catalog matching the parameters in the given
   /// TFunctionsRequest.
   Status GetFunctions(const TGetFunctionsRequest& request,
@@ -109,8 +118,10 @@ class Catalog {
   jmethodID get_catalog_object_id_;  // JniCatalog.getCatalogObject()
   jmethodID get_catalog_delta_id_;  // JniCatalog.getCatalogDelta()
   jmethodID get_catalog_version_id_;  // JniCatalog.getCatalogVersion()
+  jmethodID get_catalog_usage_id_; // JniCatalog.getCatalogUsage()
   jmethodID get_dbs_id_; // JniCatalog.getDbs()
   jmethodID get_table_names_id_; // JniCatalog.getTableNames()
+  jmethodID get_table_metrics_id_; // JniCatalog.getTableMetrics()
   jmethodID get_functions_id_; // JniCatalog.getFunctions()
   jmethodID prioritize_load_id_; // JniCatalog.prioritizeLoad()
   jmethodID sentry_admin_check_id_; // JniCatalog.checkUserSentryAdmin()

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 7894f75..0f1ea5d 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -266,6 +266,17 @@ struct THdfsPartition {
 
   // (key,value) pairs stored in the Hive Metastore.
   15: optional map<string, string> hms_parameters
+
+  // The following fields store stats about this partition
+  // which are collected when toThrift() is called.
+  // Total number of blocks in this partition.
+  16: optional i64 num_blocks
+
+  // Total file size in bytes of this partition.
+  17: optional i64 total_file_size_bytes
+
+  // True, if this partition has incremental stats
+  18: optional bool has_incremental_stats
 }
 
 struct THdfsTable {

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index f856871..ba21605 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -87,6 +87,17 @@ struct TGetTablesResult {
   1: list<string> tables
 }
 
+// Arguments to getTableMetrics, which returns the metrics of a specific table.
+struct TGetTableMetricsParams {
+  1: required CatalogObjects.TTableName table_name
+}
+
+// Response to a getTableMetrics request. The response contains all the collected metrics
+// pretty-printed into a string.
+struct TGetTableMetricsResponse {
+  1: required string metrics
+}
+
 // Arguments to getDbs, which returns a list of dbs that match an optional pattern
 struct TGetDbsParams {
   // If not set, match every database

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/common/thrift/JniCatalog.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/JniCatalog.thrift b/common/thrift/JniCatalog.thrift
index 939e276..4edf4d2 100644
--- a/common/thrift/JniCatalog.thrift
+++ b/common/thrift/JniCatalog.thrift
@@ -605,3 +605,25 @@ struct TDropFunctionParams {
   // the signature generated by the FE should just be plumbed through).
   4: optional string signature
 }
+
+// Stores metrics of a catalog table.
+struct TTableUsageMetrics {
+  1: required CatalogObjects.TTableName table_name
+
+  // Estimated memory usage of that table.
+  2: optional i64 memory_estimate_bytes
+
+  // Number of metadata operations performed on the table since it was loaded.
+  3: optional i64 num_metadata_operations
+}
+
+// Response to a GetCatalogUsage request.
+struct TGetCatalogUsageResponse{
+  // List of the largest (in terms of memory requirements) tables.
+  1: required list<TTableUsageMetrics> large_tables
+
+  // List of the most frequently accessed (in terms of number of metadata operations)
+  // tables.
+  2: required list<TTableUsageMetrics> frequently_accessed_tables
+}
+

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/pom.xml
----------------------------------------------------------------------
diff --git a/fe/pom.xml b/fe/pom.xml
index 646820f..135ec77 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -312,6 +312,12 @@ under the License.
       <version>1.6.0.1</version>
     </dependency>
 
+    <dependency>
+      <groupId>io.dropwizard.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+      <version>3.2.2</version>
+    </dependency>
+
   </dependencies>
 
   <reporting>

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/Catalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Catalog.java b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
index 9ed8133..4548c2b 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Catalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Catalog.java
@@ -35,6 +35,7 @@ import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
 import org.apache.impala.util.PatternMatcher;
+
 import org.apache.log4j.Logger;
 
 /**
@@ -167,7 +168,11 @@ public abstract class Catalog {
     // Remove the old table name from the cache and add the new table.
     Db db = getDb(tableName.getDb_name());
     if (db == null) return null;
-    return db.removeTable(tableName.getTable_name());
+    Table tbl = db.removeTable(tableName.getTable_name());
+    if (tbl != null && !tbl.isStoredInImpaladCatalogCache()) {
+      CatalogUsageMonitor.INSTANCE.removeTable(tbl);
+    }
+    return tbl;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index f75b0a8..8f75a16 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -63,10 +63,12 @@ import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TCatalogUpdateResult;
 import org.apache.impala.thrift.TFunction;
 import org.apache.impala.thrift.TGetCatalogDeltaResponse;
+import org.apache.impala.thrift.TGetCatalogUsageResponse;
 import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TPrivilege;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableName;
+import org.apache.impala.thrift.TTableUsageMetrics;
 import org.apache.impala.thrift.TUniqueId;
 import org.apache.impala.util.PatternMatcher;
 import org.apache.impala.util.SentryProxy;
@@ -74,6 +76,7 @@ import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TCompactProtocol;
 
+import com.codahale.metrics.Timer;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -1434,11 +1437,12 @@ public class CatalogServiceCatalog extends Catalog {
     Preconditions.checkState(!(tbl instanceof IncompleteTable));
     String dbName = tbl.getDb().getName();
     String tblName = tbl.getName();
-
     if (!tryLockTable(tbl)) {
       throw new CatalogException(String.format("Error refreshing metadata for table " +
           "%s due to lock contention", tbl.getFullName()));
     }
+    final Timer.Context context =
+        tbl.getMetrics().getTimer(Table.REFRESH_DURATION_METRIC).time();
     try {
       long newCatalogVersion = incrementAndGetCatalogVersion();
       versionLock_.writeLock().unlock();
@@ -1456,6 +1460,7 @@ public class CatalogServiceCatalog extends Catalog {
       LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));
       return tbl.toTCatalogObject();
     } finally {
+      context.stop();
       Preconditions.checkState(!versionLock_.isWriteLockedByCurrentThread());
       tbl.getLock().unlock();
     }
@@ -1904,4 +1909,59 @@ public class CatalogServiceCatalog extends Catalog {
     }
     return versionToWaitFor;
   }
+
+  /**
+   * Retrieves information about the current catalog usage including the most frequently
+   * accessed tables as well as the tables with the highest memory requirements.
+   */
+  public TGetCatalogUsageResponse getCatalogUsage() {
+    TGetCatalogUsageResponse usage = new TGetCatalogUsageResponse();
+    usage.setLarge_tables(Lists.<TTableUsageMetrics>newArrayList());
+    usage.setFrequently_accessed_tables(Lists.<TTableUsageMetrics>newArrayList());
+    for (Table largeTable: CatalogUsageMonitor.INSTANCE.getLargestTables()) {
+      TTableUsageMetrics tableUsageMetrics =
+          new TTableUsageMetrics(largeTable.getTableName().toThrift());
+      tableUsageMetrics.setMemory_estimate_bytes(largeTable.getEstimatedMetadataSize());
+      usage.addToLarge_tables(tableUsageMetrics);
+    }
+    for (Table frequentTable:
+        CatalogUsageMonitor.INSTANCE.getFrequentlyAccessedTables()) {
+      TTableUsageMetrics tableUsageMetrics =
+          new TTableUsageMetrics(frequentTable.getTableName().toThrift());
+      tableUsageMetrics.setNum_metadata_operations(frequentTable.getMetadataOpsCount());
+      usage.addToFrequently_accessed_tables(tableUsageMetrics);
+    }
+    return usage;
+  }
+
+  /**
+   * Retrieves the stored metrics of the specified table and returns a pretty-printed
+   * string representation. Throws an exception if table metrics were not available
+   * because the table was not loaded or because another concurrent operation was holding
+   * the table lock.
+   */
+  public String getTableMetrics(TTableName tTableName) throws CatalogException {
+    String dbName = tTableName.db_name;
+    String tblName = tTableName.table_name;
+    Table tbl = getTable(dbName, tblName);
+    if (tbl == null) {
+      throw new CatalogException("Table " + dbName + "." + tblName + " was not found.");
+    }
+    String result;
+    if (tbl instanceof IncompleteTable) {
+      result = "No metrics available for table " + dbName + "." + tblName +
+          ". Table not yet loaded.";
+      return result;
+    }
+    if (!tbl.getLock().tryLock()) {
+      result = "Metrics for table " + dbName + "." + tblName + "are not available " +
+          "because the table is currently modified by another operation.";
+      return result;
+    }
+    try {
+      return tbl.getMetrics().toString();
+    } finally {
+      tbl.getLock().unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/CatalogUsageMonitor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogUsageMonitor.java b/fe/src/main/java/org/apache/impala/catalog/CatalogUsageMonitor.java
new file mode 100644
index 0000000..a2e8d7e
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogUsageMonitor.java
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.catalog;
+
+import java.util.List;
+
+import org.apache.impala.util.TopNCache;
+
+import com.google.common.base.Function;
+
+/**
+ * Singleton class that monitors catalog usage. Currently, it tracks the most
+ * frequently accessed tables (in terms of number of metadata operations) as well as
+ * the tables with the highest (estimated) memory requirements. This class is
+ * thread-safe.
+ */
+public final class CatalogUsageMonitor {
+
+  public final static CatalogUsageMonitor INSTANCE = new CatalogUsageMonitor();
+
+  private final TopNCache<Table, Long> frequentlyAccessedTables_;
+
+  private final TopNCache<Table, Long> largestTables_;
+
+  private CatalogUsageMonitor() {
+    final int num_tables_tracked = Integer.getInteger(
+        "org.apache.impala.catalog.CatalogUsageMonitor.NUM_TABLES_TRACKED", 25);
+    frequentlyAccessedTables_ = new TopNCache<Table, Long>(
+        new Function<Table, Long>() {
+          @Override
+          public Long apply(Table tbl) { return tbl.getMetadataOpsCount(); }
+        }, num_tables_tracked, true);
+
+    largestTables_ = new TopNCache<Table, Long>(
+        new Function<Table, Long>() {
+          @Override
+          public Long apply(Table tbl) { return tbl.getEstimatedMetadataSize(); }
+        }, num_tables_tracked, false);
+  }
+
+  public void updateFrequentlyAccessedTables(Table tbl) {
+    frequentlyAccessedTables_.putOrUpdate(tbl);
+  }
+
+  public void updateLargestTables(Table tbl) { largestTables_.putOrUpdate(tbl); }
+
+  public void removeTable(Table tbl) {
+    frequentlyAccessedTables_.remove(tbl);
+    largestTables_.remove(tbl);
+  }
+
+  public List<Table> getFrequentlyAccessedTables() {
+    return frequentlyAccessedTables_.listEntries();
+  }
+
+  public List<Table> getLargestTables() { return largestTables_.listEntries(); }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
index 6df7c28..cf36a89 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HBaseTable.java
@@ -64,6 +64,8 @@ import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
 import org.apache.impala.util.StatsHelper;
 import org.apache.impala.util.TResultRowBuilder;
+
+import com.codahale.metrics.Timer;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
@@ -321,6 +323,8 @@ public class HBaseTable extends Table {
   public void load(boolean reuseMetadata, IMetaStoreClient client,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
     Preconditions.checkNotNull(getMetaStoreTable());
+    final Timer.Context context =
+        getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
     try {
       msTable_ = msTbl;
       hbaseTableName_ = getHBaseTableName(getMetaStoreTable());
@@ -414,6 +418,8 @@ public class HBaseTable extends Table {
     } catch (Exception e) {
       throw new TableLoadingException("Failed to load metadata for HBase table: " +
           name_, e);
+    } finally {
+      context.stop();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index e78ce92..2179346 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -931,6 +931,7 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
     thriftHdfsPart.setAccess_level(accessLevel_);
     thriftHdfsPart.setIs_marked_cached(isMarkedCached_);
     thriftHdfsPart.setId(getId());
+    thriftHdfsPart.setHas_incremental_stats(hasIncrementalStats());
     // IMPALA-4902: Shallow-clone the map to avoid concurrent modifications. One thread
     // may try to serialize the returned THdfsPartition after releasing the table's lock,
     // and another thread doing DDL may modify the map.
@@ -938,11 +939,16 @@ public class HdfsPartition implements Comparable<HdfsPartition> {
         includeIncrementalStats ? hmsParameters_ : getFilteredHmsParameters()));
     if (includeFileDesc) {
       // Add block location information
+      long numBlocks = 0;
+      long totalFileBytes = 0;
       for (FileDescriptor fd: fileDescriptors_) {
         thriftHdfsPart.addToFile_desc(fd.toThrift());
+        numBlocks += fd.getNumFileBlocks();
+        totalFileBytes += fd.getFileLength();
       }
+      thriftHdfsPart.setNum_blocks(numBlocks);
+      thriftHdfsPart.setTotal_file_size_bytes(totalFileBytes);
     }
-
     return thriftHdfsPart;
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 4de25fe..04599f5 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -95,6 +95,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Timer;
+
 /**
  * Internal representation of table-related metadata of a file-resident table on a
  * Hadoop filesystem. The table data can be accessed through libHDFS (which is more of
@@ -124,6 +127,24 @@ public class HdfsTable extends Table {
   // Table property key for skip.header.line.count
   public static final String TBL_PROP_SKIP_HEADER_LINE_COUNT = "skip.header.line.count";
 
+  // Average memory requirements (in bytes) for storing the metadata of a partition.
+  private static final long PER_PARTITION_MEM_USAGE_BYTES = 2048;
+
+  // Average memory requirements (in bytes) for storing a file descriptor.
+  private static final long PER_FD_MEM_USAGE_BYTES = 500;
+
+  // Average memory requirements (in bytes) for storing a block.
+  private static final long PER_BLOCK_MEM_USAGE_BYTES = 150;
+
+  // Hdfs table specific metrics
+  public static final String CATALOG_UPDATE_DURATION_METRIC = "catalog-update-duration";
+  public static final String NUM_PARTITIONS_METRIC = "num-partitions";
+  public static final String NUM_FILES_METRIC = "num-files";
+  public static final String NUM_BLOCKS_METRIC = "num-blocks";
+  public static final String TOTAL_FILE_BYTES_METRIC = "total-file-size-bytes";
+  public static final String MEMORY_ESTIMATE_METRIC = "memory-estimate-bytes";
+  public static final String HAS_INCREMENTAL_STATS_METRIC = "has-incremental-stats";
+
   // string to indicate NULL. set in load() from table properties
   private String nullColumnValue_;
 
@@ -172,19 +193,14 @@ public class HdfsTable extends Table {
   // replicas of the block.
   private final ListMap<TNetworkAddress> hostIndex_ = new ListMap<TNetworkAddress>();
 
-  private HdfsPartitionLocationCompressor partitionLocationCompressor_;
-
-  // Total number of Hdfs files in this table. Accounted only in the Impalad catalog
-  // cache. Set to -1 on Catalogd.
-  private long numHdfsFiles_;
-
-  // Sum of sizes of all Hdfs files in this table. Accounted only in the Impalad
-  // catalog cache. Set to -1 on Catalogd.
-  private long totalHdfsBytes_;
+  // True iff this table has incremental stats in any of its partitions.
+  private boolean hasIncrementalStats_ = false;
 
   // True iff the table's partitions are located on more than one filesystem.
   private boolean multipleFileSystems_ = false;
 
+  private HdfsPartitionLocationCompressor partitionLocationCompressor_;
+
   // Base Hdfs directory where files of this table are stored.
   // For unpartitioned tables it is simply the path where all files live.
   // For partitioned tables it is the root directory
@@ -200,6 +216,50 @@ public class HdfsTable extends Table {
   // for setAvroSchema().
   private boolean isSchemaLoaded_ = false;
 
+  // Represents a set of storage-related statistics aggregated at the table or partition
+  // level.
+  public final static class FileMetadataStats {
+    // Nuber of files in a table/partition.
+    public long numFiles;
+    // Number of blocks in a table/partition.
+    public long numBlocks;
+    // Total size (in bytes) of all files in a table/partition.
+    public long totalFileBytes;
+
+    // Unsets the storage stats to indicate that their values are unknown.
+    public void unset() {
+      numFiles = -1;
+      numBlocks = -1;
+      totalFileBytes = -1;
+    }
+
+    // Initializes the values of the storage stats.
+    public void init() {
+      numFiles = 0;
+      numBlocks = 0;
+      totalFileBytes = 0;
+    }
+
+    public void set(FileMetadataStats stats) {
+      numFiles = stats.numFiles;
+      numBlocks = stats.numBlocks;
+      totalFileBytes = stats.totalFileBytes;
+    }
+  }
+
+  // Table level storage-related statistics. Depending on whether the table is stored in
+  // the catalog server or the impalad catalog cache, these statistics serve different
+  // purposes and, hence, are managed differently.
+  // Table stored in impalad catalog cache:
+  //   - Used in planning.
+  //   - Stats are modified real-time by the operations that modify table metadata
+  //   (e.g. add partition).
+  // Table stored in the the catalog server:
+  //   - Used for reporting through catalog web UI.
+  //   - Stats are reset whenever the table is loaded (due to a metadata operation) and
+  //   are set when the table is serialized to Thrift.
+  private FileMetadataStats fileMetadataStats_ = new FileMetadataStats();
+
   private final static Logger LOG = LoggerFactory.getLogger(HdfsTable.class);
 
   // Caching this configuration object makes calls to getFileSystem much quicker
@@ -311,17 +371,17 @@ public class HdfsTable extends Table {
   }
 
   /**
-   * Updates numHdfsFiles_ and totalHdfsBytes_ based on the partition information.
+   * Updates the storage stats of this table based on the partition information.
    * This is used only for the frontend tests that do not spawn a separate Catalog
    * instance.
    */
   public void computeHdfsStatsForTesting() {
-    Preconditions.checkState(numHdfsFiles_ == -1 && totalHdfsBytes_ == -1);
-    numHdfsFiles_ = 0;
-    totalHdfsBytes_ = 0;
+    Preconditions.checkState(fileMetadataStats_.numFiles == -1
+        && fileMetadataStats_.totalFileBytes == -1);
+    fileMetadataStats_.init();
     for (HdfsPartition partition: partitionMap_.values()) {
-      numHdfsFiles_ += partition.getNumFileDescriptors();
-      totalHdfsBytes_ += partition.getSize();
+      fileMetadataStats_.numFiles += partition.getNumFileDescriptors();
+      fileMetadataStats_.totalFileBytes += partition.getSize();
     }
   }
 
@@ -681,8 +741,7 @@ public class HdfsTable extends Table {
         nullPartitionIds_.add(Sets.<Long>newHashSet());
       }
     }
-    numHdfsFiles_ = 0;
-    totalHdfsBytes_ = 0;
+    fileMetadataStats_.init();
   }
 
   /**
@@ -1023,8 +1082,8 @@ public class HdfsTable extends Table {
     }
     if (partition.getFileFormat() == HdfsFileFormat.AVRO) hasAvroData_ = true;
     partitionMap_.put(partition.getId(), partition);
-    totalHdfsBytes_ += partition.getSize();
-    numHdfsFiles_ += partition.getNumFileDescriptors();
+    fileMetadataStats_.totalFileBytes += partition.getSize();
+    fileMetadataStats_.numFiles += partition.getNumFileDescriptors();
     updatePartitionMdAndColStats(partition);
   }
 
@@ -1078,8 +1137,8 @@ public class HdfsTable extends Table {
    */
   private HdfsPartition dropPartition(HdfsPartition partition) {
     if (partition == null) return null;
-    totalHdfsBytes_ -= partition.getSize();
-    numHdfsFiles_ -= partition.getNumFileDescriptors();
+    fileMetadataStats_.totalFileBytes -= partition.getSize();
+    fileMetadataStats_.numFiles -= partition.getNumFileDescriptors();
     Preconditions.checkArgument(partition.getPartitionValues().size() ==
         numClusteringCols_);
     Long partitionId = partition.getId();
@@ -1176,49 +1235,54 @@ public class HdfsTable extends Table {
       org.apache.hadoop.hive.metastore.api.Table msTbl,
       boolean loadParitionFileMetadata, boolean loadTableSchema,
       Set<String> partitionsToUpdate) throws TableLoadingException {
-    // turn all exceptions into TableLoadingException
-    msTable_ = msTbl;
+    final Timer.Context context =
+        getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
     try {
-      if (loadTableSchema) loadSchema(client, msTbl);
-      if (reuseMetadata && getCatalogVersion() == Catalog.INITIAL_CATALOG_VERSION) {
-        // This is the special case of CTAS that creates a 'temp' table that does not
-        // actually exist in the Hive Metastore.
-        initializePartitionMetadata(msTbl);
-        setTableStats(msTbl);
-        return;
-      }
-      // Load partition and file metadata
-      if (reuseMetadata) {
-        // Incrementally update this table's partitions and file metadata
-        LOG.info("Incrementally loading table metadata for: " + getFullName());
-        Preconditions.checkState(
-            partitionsToUpdate == null || loadParitionFileMetadata);
-        updateMdFromHmsTable(msTbl);
-        if (msTbl.getPartitionKeysSize() == 0) {
-          if (loadParitionFileMetadata) updateUnpartitionedTableFileMd();
+      // turn all exceptions into TableLoadingException
+      msTable_ = msTbl;
+      try {
+        if (loadTableSchema) loadSchema(client, msTbl);
+        if (reuseMetadata && getCatalogVersion() == Catalog.INITIAL_CATALOG_VERSION) {
+          // This is the special case of CTAS that creates a 'temp' table that does not
+          // actually exist in the Hive Metastore.
+          initializePartitionMetadata(msTbl);
+          setTableStats(msTbl);
+          return;
+        }
+        // Load partition and file metadata
+        if (reuseMetadata) {
+          // Incrementally update this table's partitions and file metadata
+          LOG.info("Incrementally loading table metadata for: " + getFullName());
+          Preconditions.checkState(
+              partitionsToUpdate == null || loadParitionFileMetadata);
+          updateMdFromHmsTable(msTbl);
+          if (msTbl.getPartitionKeysSize() == 0) {
+            if (loadParitionFileMetadata) updateUnpartitionedTableFileMd();
+          } else {
+            updatePartitionsFromHms(
+                client, partitionsToUpdate, loadParitionFileMetadata);
+          }
+          LOG.info("Incrementally loaded table metadata for: " + getFullName());
         } else {
-          updatePartitionsFromHms(
-              client, partitionsToUpdate, loadParitionFileMetadata);
+          // Load all partitions from Hive Metastore, including file metadata.
+          LOG.info("Fetching partition metadata from the Metastore: " + getFullName());
+          List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
+              MetaStoreUtil.fetchAllPartitions(
+                  client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
+          LOG.info("Fetched partition metadata from the Metastore: " + getFullName());
+          loadAllPartitions(msPartitions, msTbl);
         }
-        LOG.info("Incrementally loaded table metadata for: " + getFullName());
-      } else {
-        // Load all partitions from Hive Metastore, including file metadata.
-        LOG.info("Fetching partition metadata from the Metastore: " + getFullName());
-        List<org.apache.hadoop.hive.metastore.api.Partition> msPartitions =
-            MetaStoreUtil.fetchAllPartitions(
-                client, db_.getName(), name_, NUM_PARTITION_FETCH_RETRIES);
-        LOG.info("Fetched partition metadata from the Metastore: " + getFullName());
-        loadAllPartitions(msPartitions, msTbl);
+        if (loadTableSchema) setAvroSchema(client, msTbl);
+        setTableStats(msTbl);
+        fileMetadataStats_.unset();
+      } catch (TableLoadingException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new TableLoadingException("Failed to load metadata for table: "
+            + getFullName(), e);
       }
-      if (loadTableSchema) setAvroSchema(client, msTbl);
-      setTableStats(msTbl);
-      numHdfsFiles_ = -1;
-      totalHdfsBytes_ = -1;
-    } catch (TableLoadingException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new TableLoadingException("Failed to load metadata for table: "
-          + getFullName(), e);
+    } finally {
+      context.stop();
     }
   }
 
@@ -1648,25 +1712,49 @@ public class HdfsTable extends Table {
    * partitions). To prevent the catalog from hitting an OOM error while trying to
    * serialize large partition incremental stats, we estimate the stats size and filter
    * the incremental stats data from partition objects if the estimate exceeds
-   * --inc_stats_size_limit_bytes
+   * --inc_stats_size_limit_bytes. This function also collects storage related statistics
+   *  (e.g. number of blocks, files, etc) in order to compute an estimate of the metadata
+   *  size of this table.
    */
   private THdfsTable getTHdfsTable(boolean includeFileDesc, Set<Long> refPartitions) {
     // includeFileDesc implies all partitions should be included (refPartitions == null).
     Preconditions.checkState(!includeFileDesc || refPartitions == null);
+    long memUsageEstimate = 0;
     int numPartitions =
         (refPartitions == null) ? partitionMap_.values().size() : refPartitions.size();
+    memUsageEstimate += numPartitions * PER_PARTITION_MEM_USAGE_BYTES;
     long statsSizeEstimate =
         numPartitions * getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
     boolean includeIncrementalStats =
         (statsSizeEstimate < BackendConfig.INSTANCE.getIncStatsMaxSize());
+    FileMetadataStats stats = new FileMetadataStats();
     Map<Long, THdfsPartition> idToPartition = Maps.newHashMap();
     for (HdfsPartition partition: partitionMap_.values()) {
       long id = partition.getId();
       if (refPartitions == null || refPartitions.contains(id)) {
-        idToPartition.put(id,
-            partition.toThrift(includeFileDesc, includeIncrementalStats));
+        THdfsPartition tHdfsPartition =
+            partition.toThrift(includeFileDesc, includeIncrementalStats);
+        if (tHdfsPartition.isSetHas_incremental_stats() &&
+            tHdfsPartition.isHas_incremental_stats()) {
+          memUsageEstimate += getColumns().size() * STATS_SIZE_PER_COLUMN_BYTES;
+          hasIncrementalStats_ = true;
+        }
+        if (includeFileDesc) {
+          Preconditions.checkState(tHdfsPartition.isSetNum_blocks() &&
+              tHdfsPartition.isSetTotal_file_size_bytes());
+          stats.numBlocks += tHdfsPartition.getNum_blocks();
+          stats.numFiles +=
+              tHdfsPartition.isSetFile_desc() ? tHdfsPartition.getFile_desc().size() : 0;
+          stats.totalFileBytes += tHdfsPartition.getTotal_file_size_bytes();
+        }
+        idToPartition.put(id, tHdfsPartition);
       }
     }
+    if (includeFileDesc) fileMetadataStats_.set(stats);
+
+    memUsageEstimate += fileMetadataStats_.numFiles * PER_FD_MEM_USAGE_BYTES +
+        fileMetadataStats_.numBlocks * PER_BLOCK_MEM_USAGE_BYTES;
+    setEstimatedMetadataSize(memUsageEstimate);
     THdfsTable hdfsTable = new THdfsTable(hdfsBaseDir_, getColumnNames(),
         nullPartitionKeyValue_, nullColumnValue_, idToPartition);
     hdfsTable.setAvroSchema(avroSchema_);
@@ -1680,7 +1768,7 @@ public class HdfsTable extends Table {
     return hdfsTable;
   }
 
-  public long getTotalHdfsBytes() { return totalHdfsBytes_; }
+  public long getTotalHdfsBytes() { return fileMetadataStats_.totalFileBytes; }
   public String getHdfsBaseDir() { return hdfsBaseDir_; }
   public Path getHdfsBaseDirPath() { return new Path(hdfsBaseDir_); }
   public boolean isAvroTable() { return avroSchema_ != null; }
@@ -1978,8 +2066,11 @@ public class HdfsTable extends Table {
       // Compute and report the extrapolated row count because the set of files could
       // have changed since we last computed stats for this partition. We also follow
       // this policy during scan-cardinality estimation.
-      if (statsExtrap) rowBuilder.add(getExtrapolatedNumRows(totalHdfsBytes_));
-      rowBuilder.add(numHdfsFiles_).addBytes(totalHdfsBytes_)
+      if (statsExtrap) {
+        rowBuilder.add(getExtrapolatedNumRows(fileMetadataStats_.totalFileBytes));
+      }
+      rowBuilder.add(fileMetadataStats_.numFiles)
+          .addBytes(fileMetadataStats_.totalFileBytes)
           .addBytes(totalCachedBytes).add("").add("").add("").add("");
       result.addToRows(rowBuilder.get());
     }
@@ -2072,13 +2163,13 @@ public class HdfsTable extends Table {
     // Conservative max size for Java arrays. The actual maximum varies
     // from JVM version and sometimes between configurations.
     final long JVM_MAX_ARRAY_SIZE = Integer.MAX_VALUE - 10;
-    if (numHdfsFiles_ > JVM_MAX_ARRAY_SIZE) {
+    if (fileMetadataStats_.numFiles > JVM_MAX_ARRAY_SIZE) {
       throw new IllegalStateException(String.format(
           "Too many files to generate a table sample. " +
           "Table '%s' has %s files, but a maximum of %s files are supported.",
-          getTableName().toString(), numHdfsFiles_, JVM_MAX_ARRAY_SIZE));
+          getTableName().toString(), fileMetadataStats_.numFiles, JVM_MAX_ARRAY_SIZE));
     }
-    int totalNumFiles = (int) numHdfsFiles_;
+    int totalNumFiles = (int) fileMetadataStats_.numFiles;
 
     // Ensure a consistent ordering of files for repeatable runs. The files within a
     // partition are already ordered based on how they are loaded in the catalog.
@@ -2134,4 +2225,37 @@ public class HdfsTable extends Table {
     }
     return result;
   }
+
+  /**
+   * Registers table metrics.
+   */
+  @Override
+  public void initMetrics() {
+    super.initMetrics();
+    metrics_.addGauge(NUM_PARTITIONS_METRIC, new Gauge<Integer>() {
+      @Override
+      public Integer getValue() { return partitionMap_.values().size(); }
+    });
+    metrics_.addGauge(NUM_FILES_METRIC, new Gauge<Long>() {
+      @Override
+      public Long getValue() { return fileMetadataStats_.numFiles; }
+    });
+    metrics_.addGauge(NUM_BLOCKS_METRIC, new Gauge<Long>() {
+      @Override
+      public Long getValue() { return fileMetadataStats_.numBlocks; }
+    });
+    metrics_.addGauge(TOTAL_FILE_BYTES_METRIC, new Gauge<Long>() {
+      @Override
+      public Long getValue() { return fileMetadataStats_.totalFileBytes; }
+    });
+    metrics_.addGauge(MEMORY_ESTIMATE_METRIC, new Gauge<Long>() {
+      @Override
+      public Long getValue() { return getEstimatedMetadataSize(); }
+    });
+    metrics_.addGauge(HAS_INCREMENTAL_STATS_METRIC, new Gauge<Boolean>() {
+      @Override
+      public Boolean getValue() { return hasIncrementalStats_; }
+    });
+    metrics_.addTimer(CATALOG_UPDATE_DURATION_METRIC);
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index 7e13ac5..e9e1617 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -58,6 +58,7 @@ import org.apache.kudu.client.PartitionSchema.RangeSchema;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
+import com.codahale.metrics.Timer;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -215,30 +216,36 @@ public class KuduTable extends Table {
   @Override
   public void load(boolean dummy /* not used */, IMetaStoreClient msClient,
       org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException {
-    msTable_ = msTbl;
-    kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME);
-    Preconditions.checkNotNull(kuduTableName_);
-    kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
-    Preconditions.checkNotNull(kuduMasters_);
-    setTableStats(msTable_);
-    // Load metadata from Kudu and HMS
+    final Timer.Context context =
+        getMetrics().getTimer(Table.LOAD_DURATION_METRIC).time();
     try {
-      loadSchemaFromKudu();
-      loadAllColumnStats(msClient);
-    } catch (ImpalaRuntimeException e) {
-      throw new TableLoadingException("Error loading metadata for Kudu table " +
-          kuduTableName_, e);
-    }
+      msTable_ = msTbl;
+      kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME);
+      Preconditions.checkNotNull(kuduTableName_);
+      kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS);
+      Preconditions.checkNotNull(kuduMasters_);
+      setTableStats(msTable_);
+      // Load metadata from Kudu and HMS
+      try {
+        loadSchemaFromKudu();
+        loadAllColumnStats(msClient);
+      } catch (ImpalaRuntimeException e) {
+        throw new TableLoadingException("Error loading metadata for Kudu table " +
+            kuduTableName_, e);
+      }
 
-    // Update the table schema in HMS.
-    try {
-      long lastDdlTime = CatalogOpExecutor.calculateDdlTime(msTable_);
-      msTable_.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime));
-      msTable_.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS,
-          StatsSetupConst.TRUE);
-      msClient.alter_table(msTable_.getDbName(), msTable_.getTableName(), msTable_);
-    } catch (TException e) {
-      throw new TableLoadingException(e.getMessage());
+      // Update the table schema in HMS.
+      try {
+        long lastDdlTime = CatalogOpExecutor.calculateDdlTime(msTable_);
+        msTable_.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime));
+        msTable_.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS,
+            StatsSetupConst.TRUE);
+        msClient.alter_table(msTable_.getDbName(), msTable_.getTableName(), msTable_);
+      } catch (TException e) {
+        throw new TableLoadingException(e.getMessage());
+      }
+    } finally {
+      context.stop();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 50fe953..a6536ba 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.impala.analysis.TableName;
 import org.apache.impala.common.ImpalaRuntimeException;
+import org.apache.impala.common.Metrics;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.RuntimeEnv;
 import org.apache.impala.thrift.TAccessLevel;
@@ -73,6 +75,18 @@ public abstract class Table extends CatalogObjectImpl {
   // values of -1 indicate an unknown statistic.
   protected TTableStats tableStats_;
 
+  // Estimated size (in bytes) of this table metadata. Stored in an AtomicLong to allow
+  // this field to be accessed without holding the table lock.
+  protected AtomicLong estimatedMetadataSize_ = new AtomicLong(0);
+
+  // Number of metadata operations performed on that table since it was loaded.
+  // Stored in an AtomicLong to allow this field to be accessed without holding the
+  // table lock.
+  protected AtomicLong metadataOpsCount_ = new AtomicLong(0);
+
+  // Metrics for this table
+  protected final Metrics metrics_ = new Metrics();
+
   // colsByPos[i] refers to the ith column in the table. The first numClusteringCols are
   // the clustering columns.
   protected final ArrayList<Column> colsByPos_ = Lists.newArrayList();
@@ -89,6 +103,12 @@ public abstract class Table extends CatalogObjectImpl {
   // True if this object is stored in an Impalad catalog cache.
   protected boolean storedInImpaladCatalogCache_ = false;
 
+  // Table metrics. These metrics are applicable to all table types. Each subclass of
+  // Table can define additional metrics specific to that table type.
+  public static final String REFRESH_DURATION_METRIC = "refresh-duration";
+  public static final String ALTER_DURATION_METRIC = "alter-duration";
+  public static final String LOAD_DURATION_METRIC = "load-duration";
+
   protected Table(org.apache.hadoop.hive.metastore.api.Table msTable, Db db,
       String name, String owner) {
     msTable_ = msTable;
@@ -99,12 +119,36 @@ public abstract class Table extends CatalogObjectImpl {
         CatalogServiceCatalog.getLastDdlTime(msTable_) : -1;
     tableStats_ = new TTableStats(-1);
     tableStats_.setTotal_file_bytes(-1);
+    initMetrics();
   }
 
   public ReentrantLock getLock() { return tableLock_; }
   public abstract TTableDescriptor toThriftDescriptor(
       int tableId, Set<Long> referencedPartitions);
   public abstract TCatalogObjectType getCatalogObjectType();
+  public long getMetadataOpsCount() { return metadataOpsCount_.get(); }
+  public long getEstimatedMetadataSize() { return estimatedMetadataSize_.get(); }
+  public void setEstimatedMetadataSize(long estimatedMetadataSize) {
+    estimatedMetadataSize_.set(estimatedMetadataSize);
+    if (!isStoredInImpaladCatalogCache()) {
+      CatalogUsageMonitor.INSTANCE.updateLargestTables(this);
+    }
+  }
+
+  public void incrementMetadataOpsCount() {
+    metadataOpsCount_.incrementAndGet();
+    if (!isStoredInImpaladCatalogCache()) {
+      CatalogUsageMonitor.INSTANCE.updateFrequentlyAccessedTables(this);
+    }
+  }
+
+  public void initMetrics() {
+    metrics_.addTimer(REFRESH_DURATION_METRIC);
+    metrics_.addTimer(ALTER_DURATION_METRIC);
+    metrics_.addTimer(LOAD_DURATION_METRIC);
+  }
+
+  public Metrics getMetrics() { return metrics_; }
 
   // Returns true if this table reference comes from the impalad catalog cache or if it
   // is loaded from the testing framework. Returns false if this table reference points
@@ -527,4 +571,19 @@ public abstract class Table extends CatalogObjectImpl {
     }
     return new Pair<String, Short>(cachePoolName, cacheReplication);
   }
+
+  /**
+   * The implementations of hashCode() and equals() functions are using table names as
+   * unique identifiers of tables. Hence, they should be used with caution and not in
+   * cases where truly unique table objects are needed.
+   */
+  @Override
+  public int hashCode() { return getFullName().hashCode(); }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) return false;
+    if (!(obj instanceof Table)) return false;
+    return getFullName().equals(((Table) obj).getFullName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/common/Metrics.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/common/Metrics.java b/fe/src/main/java/org/apache/impala/common/Metrics.java
new file mode 100644
index 0000000..cf8621f
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/common/Metrics.java
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.common;
+
+import java.util.Map.Entry;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Snapshot;
+import com.codahale.metrics.Timer;
+
+/**
+ * Thin wrapper class around MetricRegisty. Allows users to register and access metrics of
+ * various types (counter, meter, histogram, and timer). This class is not thread-safe.
+ * TODO: Expose the metrics in Json format via a toJson() function.
+ */
+public final class Metrics {
+
+  private final MetricRegistry registry_ = new MetricRegistry();
+
+  public Metrics() {}
+
+  public void addCounter(String name) { registry_.counter(name); }
+  public void addMeter(String name) { registry_.meter(name); }
+  public void addHistogram(String name) { registry_.histogram(name); }
+  public void addTimer(String name) { registry_.timer(name); }
+
+  @SuppressWarnings("rawtypes")
+  public <T extends Gauge> void addGauge(String name, T gauge) {
+    registry_.register(name, gauge);
+  }
+
+  /**
+   * Returns a counter named 'name'. If the counter does not exist, it is registered in
+   * the metrics registry.
+   */
+  public Counter getCounter(String name) {
+    Counter counter = registry_.getCounters().get(name);
+    if (counter == null) counter = registry_.counter(name);
+    return counter;
+  }
+
+  /**
+   * Returns a meter named 'name'. If the meter does not exist, it is registered in the
+   * metrics registry.
+   */
+  public Meter getMeter(String name) {
+    Meter meter = registry_.getMeters().get(name);
+    if (meter == null) meter = registry_.meter(name);
+    return meter;
+  }
+
+  /**
+   * Returns a histogram named 'name'. If the histogram does not exist, it is registered
+   * in the metrics registry.
+   */
+  public Histogram getHistogram(String name) {
+    Histogram histogram = registry_.getHistograms().get(name);
+    if (histogram == null) histogram = registry_.histogram(name);
+    return histogram;
+  }
+
+  /**
+   * Returns a timer named 'name'. If the timer does not exist, it is registered in the
+   * metrics registry.
+   */
+  public Timer getTimer(String name) {
+    Timer timer = registry_.getTimers().get(name);
+    if (timer == null) timer = registry_.timer(name);
+    return timer;
+  }
+
+  @SuppressWarnings("rawtypes")
+  public Gauge getGauge(String name) { return registry_.getGauges().get(name); }
+
+  /**
+   * Returns a string representation of all registered metrics.
+   */
+  @Override
+  @SuppressWarnings("rawtypes")
+  public String toString() {
+    StringBuilder result = new StringBuilder();
+    for (Entry<String, Counter> entry: registry_.getCounters().entrySet()) {
+      result.append(entry.getKey() + ": " + String.valueOf(entry.getValue().getCount()));
+      result.append("\n");
+    }
+    for (Entry<String, Timer> entry: registry_.getTimers().entrySet()) {
+      result.append(entry.getKey() + ": " + timerToString(entry.getValue()));
+      result.append("\n");
+    }
+    for (Entry<String, Gauge> entry: registry_.getGauges().entrySet()) {
+      result.append(entry.getKey() + ": " + String.valueOf(entry.getValue().getValue()));
+      result.append("\n");
+    }
+    for (Entry<String, Histogram> entry: registry_.getHistograms().entrySet()) {
+      result.append(entry.getKey() + ": " +
+          snapshotToString(entry.getValue().getSnapshot()));
+      result.append("\n");
+    }
+    return result.toString();
+  }
+
+  /**
+   * Helper function that pretty prints the contents of a timer metric.
+   */
+  private String timerToString(Timer timer) {
+    StringBuilder builder = new StringBuilder();
+    return builder.append("\n\tCount: " + timer.getCount() + "\n")
+        .append("\tMean rate: " + timer.getMeanRate() + "\n")
+        .append("\t1min rate: " + timer.getOneMinuteRate() + "\n")
+        .append("\t5min rate: " + timer.getFiveMinuteRate() + "\n")
+        .append("\t15min rate: " + timer.getFifteenMinuteRate() + "\n")
+        .append(snapshotToString(timer.getSnapshot()))
+        .toString();
+  }
+
+  /**
+   * Helper function that pretty prints the contents of a metric snapshot.
+   */
+  private String snapshotToString(Snapshot snapshot) {
+    StringBuilder builder = new StringBuilder();
+    return builder.append("\n\tMin (msec): " + snapshot.getMin() / 1000000 + "\n")
+        .append("\tMax (msec): " + snapshot.getMax() / 1000000 + "\n")
+        .append("\tMean (msec): " + snapshot.getMean() / 1000000 + "\n")
+        .append("\tMedian (msec): " + snapshot.getMedian() / 1000000 + "\n")
+        .append("\t75th-% (msec): " + snapshot.get75thPercentile() / 1000000 + "\n")
+        .append("\t95th-% (msec): " + snapshot.get95thPercentile() / 1000000 + "\n")
+        .append("\t99th-% (msec): " + snapshot.get99thPercentile() / 1000000 + "\n")
+        .toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
index df3b10b..f1422db 100644
--- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
@@ -56,6 +56,7 @@ import org.apache.impala.authorization.User;
 import org.apache.impala.catalog.Catalog;
 import org.apache.impala.catalog.CatalogException;
 import org.apache.impala.catalog.CatalogServiceCatalog;
+import org.apache.impala.catalog.CatalogUsageMonitor;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.ColumnNotFoundException;
 import org.apache.impala.catalog.DataSource;
@@ -152,6 +153,7 @@ import org.apache.impala.util.MetaStoreUtil;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
 
+import com.codahale.metrics.Timer;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -368,6 +370,8 @@ public class CatalogOpExecutor {
       throw new InternalException(String.format("Error altering table %s due to lock " +
           "contention.", tbl.getFullName()));
     }
+    final Timer.Context context
+        = tbl.getMetrics().getTimer(Table.ALTER_DURATION_METRIC).time();
     try {
       if (params.getAlter_type() == TAlterTableType.RENAME_VIEW
           || params.getAlter_type() == TAlterTableType.RENAME_TABLE) {
@@ -544,6 +548,7 @@ public class CatalogOpExecutor {
         response.setResult_set(resultSet);
       }
     } finally {
+      context.stop();
       Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
       tbl.getLock().unlock();
     }
@@ -3165,6 +3170,8 @@ public class CatalogOpExecutor {
     if (!catalog_.tryLockTable(table)) {
       throw new InternalException("Error updating the catalog due to lock contention.");
     }
+    final Timer.Context context
+        = table.getMetrics().getTimer(HdfsTable.CATALOG_UPDATE_DURATION_METRIC).time();
     try {
       long newCatalogVersion = catalog_.incrementAndGetCatalogVersion();
       catalog_.getLock().writeLock().unlock();
@@ -3319,6 +3326,7 @@ public class CatalogOpExecutor {
       loadTableMetadata(table, newCatalogVersion, true, false, partsToLoadMetadata);
       addTableToCatalogUpdate(table, response.result);
     } finally {
+      context.stop();
       Preconditions.checkState(!catalog_.getLock().isWriteLockedByCurrentThread());
       table.getLock().unlock();
     }
@@ -3356,7 +3364,10 @@ public class CatalogOpExecutor {
    * This is to help protect against certain scenarios where the table was
    * modified or dropped between the time analysis completed and the the catalog op
    * started executing. However, even with these checks it is possible the table was
-   * modified or dropped/re-created without us knowing. TODO: Track object IDs to
+   * modified or dropped/re-created without us knowing. This function also updates the
+   * table usage counter.
+   *
+   * TODO: Track object IDs to
    * know when a table has been dropped and re-created with the same name.
    */
   private Table getExistingTable(String dbName, String tblName) throws CatalogException {
@@ -3364,6 +3375,7 @@ public class CatalogOpExecutor {
     if (tbl == null) {
       throw new TableNotFoundException("Table not found: " + dbName + "." + tblName);
     }
+    tbl.incrementMetadataOpsCount();
 
     if (!tbl.isLoaded()) {
       throw new CatalogException(String.format("Table '%s.%s' was modified while " +

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/service/JniCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
index e945a3b..ed5a51a 100644
--- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java
+++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java
@@ -45,6 +45,7 @@ import org.apache.impala.thrift.TGetDbsResult;
 import org.apache.impala.thrift.TGetFunctionsRequest;
 import org.apache.impala.thrift.TGetFunctionsResponse;
 import org.apache.impala.thrift.TGetTablesParams;
+import org.apache.impala.thrift.TGetTableMetricsParams;
 import org.apache.impala.thrift.TGetTablesResult;
 import org.apache.impala.thrift.TLogLevel;
 import org.apache.impala.thrift.TPrioritizeLoadRequest;
@@ -197,6 +198,16 @@ public class JniCatalog {
   }
 
   /**
+   * Returns the collected metrics of a table.
+   */
+  public String getTableMetrics(byte[] getTableMetricsParams) throws ImpalaException,
+      TException {
+    TGetTableMetricsParams params = new TGetTableMetricsParams();
+    JniUtil.deserializeThrift(protocolFactory_, params, getTableMetricsParams);
+    return catalog_.getTableMetrics(params.table_name);
+  }
+
+  /**
    * Gets the thrift representation of a catalog object.
    */
   public byte[] getCatalogObject(byte[] thriftParams) throws ImpalaException,
@@ -262,4 +273,12 @@ public class JniCatalog {
     TSerializer serializer = new TSerializer(protocolFactory_);
     return serializer.serialize(catalogOpExecutor_.updateCatalog(request));
   }
+
+  /**
+   * Returns information about the current catalog usage.
+   */
+  public byte[] getCatalogUsage() throws ImpalaException, TException {
+    TSerializer serializer = new TSerializer(protocolFactory_);
+    return serializer.serialize(catalog_.getCatalogUsage());
+  }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/main/java/org/apache/impala/util/TopNCache.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/TopNCache.java b/fe/src/main/java/org/apache/impala/util/TopNCache.java
new file mode 100644
index 0000000..9f6b972
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/util/TopNCache.java
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Thread-safe class that represents a TOP-N cache of items. It stores the top-N items of
+ * a generic type T based on a user-specified ranking function that returns a numeric
+ * value (long).
+ *
+ * The cache has a maximum capacity (N) of stored items. The implementation allows two
+ * policies with respect to the way new items are handled when maximum capacity is
+ * reached:
+ * a) Always evict policy: A new item will always replace the item with the lowest rank
+ * according to the specified ranking function even if the rank of the newly added
+ * function is lower than the one to be replaced.
+ * b) Rank-based eviction policy: A new item will be added to the cache iff its rank is
+ * higher than the smallest rank in the cache and the item with that rank will be evicted.
+ *
+ * TODO: Replace these two policies with an LFU cache with dynamic aging.
+ */
+public final class TopNCache<T, R extends Long>  {
+
+  // Function used to rank items stored in this cache.
+  private final Function<T, R> function_;
+  // Maximum capacity of this cache.
+  private final int maxCapacity_;
+  // The cache is stored as a priority queue.
+  private final PriorityQueue<T> heap_;
+  // Determines the eviction policy to apply when the cache reaches maximum capacity.
+  // TODO: Convert to enum?
+  private final boolean alwaysEvictAtCapacity_;
+
+  /**
+   * Compares the ranks of two T objects, returning 0 if they are equal, < 0 if the rank
+   * of the first is smaller, or > 0 if the rank of the first object is larger.
+   */
+  private int compareRanks(T t1, T t2) {
+    return function_.apply(t1).compareTo(function_.apply(t2));
+  }
+
+  public TopNCache(Function<T, R> f, int maxCapacity, boolean evictAtCapacity) {
+    Preconditions.checkNotNull(f);
+    Preconditions.checkState(maxCapacity > 0);
+    function_ = f;
+    maxCapacity_ = maxCapacity;
+    heap_ = new PriorityQueue<T>(maxCapacity_,
+        new Comparator<T>() {
+          @Override
+          public int compare(T t1, T t2) { return compareRanks(t1, t2); }
+        }
+    );
+    alwaysEvictAtCapacity_ = evictAtCapacity;
+  }
+
+  /**
+   * Adds or updates an item in the cache. If the item already exists, its rank position
+   * is refreshed by removing and adding the item back to the cache. If the item is not in
+   * the cache and maximum capacity hasn't been reached, the item is added to the cache.
+   * Otherwise, the eviction policy is applied and the item either replaces the cache item
+   * with the lowest rank or it is rejected from the cache if its rank is lower than the
+   * lowest rank in the cache.
+   */
+  public synchronized void putOrUpdate(T item) {
+    if (!heap_.remove(item)) {
+      if (heap_.size() == maxCapacity_) {
+        if (!alwaysEvictAtCapacity_ && compareRanks(item, heap_.peek()) <= 0) {
+          return;
+        }
+        heap_.poll();
+      }
+    }
+    heap_.add(item);
+  }
+
+  /**
+   * Removes an item from the cache.
+   */
+  public synchronized void remove(T item) { heap_.remove(item); }
+
+  /**
+   * Returns the list of all the items in the cache.
+   */
+  public synchronized List<T> listEntries() { return ImmutableList.copyOf(heap_); }
+}
+

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/fe/src/test/java/org/apache/impala/util/TestTopNCache.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/util/TestTopNCache.java b/fe/src/test/java/org/apache/impala/util/TestTopNCache.java
new file mode 100644
index 0000000..1b34599
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/util/TestTopNCache.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.util;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Unit tests for the TopNCache class.
+ */
+public class TestTopNCache {
+
+  /**
+   * Create a TopNCache with 'capacity' max capacity and populate it with 'numEntries'
+   * entries where each entry is a number from 0 to 'numEntries'.
+   */
+  private static TopNCache<Long, Long> createAndPopulate(int capacity,
+      long numEntries, boolean policy) {
+    TopNCache<Long, Long> cache =
+        new TopNCache<Long, Long>(new Function<Long, Long>() {
+            @Override
+            public Long apply(Long element) { return element; }
+        }, capacity, policy);
+    for (long i = 0; i < numEntries; ++i) cache.putOrUpdate(i);
+    return cache;
+  }
+
+  @Test
+  public void testCreateAndPopulateCache() throws Exception {
+    int[] capacities = {1, 10, 1000};
+    boolean[] evictionPolicies = {true, false};
+    for (int capacity: capacities) {
+      for (boolean policy: evictionPolicies) {
+        TopNCache<Long, Long> cache =
+            createAndPopulate(capacity, 2 * capacity, policy);
+        assertEquals(cache.listEntries().size(), capacity);
+        for (long i = 0; i < capacity * 2; i++) cache.remove(i);
+        assertEquals(cache.listEntries().size(), 0);
+      }
+    }
+  }
+
+  @Test
+  public void testUpdateExistingElements() throws Exception {
+    final int capacity = 10;
+    TopNCache<Long, Long> cache = createAndPopulate(capacity, capacity / 2, true);
+    assertEquals(cache.listEntries().size(), capacity / 2);
+    // Adding the same elements should not alter the number of elements stored in the
+    // cache.
+    for (long i = 0; i < capacity / 2; i++) cache.putOrUpdate(i);
+    assertEquals(cache.listEntries().size(), capacity / 2);
+  }
+
+  @Test
+  public void testAlwaysEvictPolicy() throws Exception {
+    final int capacity = 10;
+    TopNCache<Long, Long> cache = createAndPopulate(capacity, capacity, true);
+    assertEquals(cache.listEntries().size(), capacity);
+    cache.putOrUpdate((long) capacity);
+    assertEquals(cache.listEntries().size(), capacity);
+    // Assert that the new element replaced the smallest element in the cache
+    assertTrue(!cache.listEntries().contains(Long.valueOf(0)));
+    cache.putOrUpdate((long) capacity + 1);
+    assertTrue(!cache.listEntries().contains(Long.valueOf(1)));
+    List<Long> cacheElements = cache.listEntries();
+    for (long i = 2; i < capacity + 2; i++) {
+      assertTrue(cacheElements.contains(Long.valueOf(i)));
+    }
+  }
+
+  @Test
+  public void testRankBasedEvictionPolicy() throws Exception {
+    final int capacity = 10;
+    TopNCache<Long, Long> cache = new TopNCache<Long, Long>(
+        new Function<Long, Long>() {
+            @Override
+            public Long apply(Long element) { return element; }
+        }, capacity, false);
+    for (long i = 1; i < capacity + 1; i++) cache.putOrUpdate(i);
+    assertEquals(cache.listEntries().size(), capacity);
+    cache.putOrUpdate((long) 0);
+    // 0 shouldn't be added to the cache because it's rank is smaller than the lowest rank
+    // in the cache.
+    assertTrue(!cache.listEntries().contains(Long.valueOf(0)));
+    cache.putOrUpdate((long) capacity + 1);
+    assertEquals(cache.listEntries().size(), capacity);
+    assertTrue(cache.listEntries().contains(Long.valueOf(capacity + 1)));
+  }
+
+  @Test
+  public void testRankBasedEvictionPolicyWithRandomInput() throws Exception {
+    final int capacity = 5;
+    long[] values = {10, 8, 1, 2, 5, 4, 3, 6, 9, 7};
+    TopNCache<Long, Long> cache = new TopNCache<Long, Long>(
+        new Function<Long, Long>() {
+            @Override
+            public Long apply(Long element) { return element; }
+        }, capacity, false);
+    for (Long entry: values) cache.putOrUpdate(entry);
+    List<Long> entries = cache.listEntries();
+    assertEquals(entries.size(), capacity);
+    // Make sure only the top-5 elements are in the cache
+    for (long i = 1; i <= capacity; ++i) {
+      assertTrue(!entries.contains(i));
+      assertTrue(entries.contains(i + capacity));
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/tests/webserver/test_web_pages.py
----------------------------------------------------------------------
diff --git a/tests/webserver/test_web_pages.py b/tests/webserver/test_web_pages.py
index 101f786..54163dc 100644
--- a/tests/webserver/test_web_pages.py
+++ b/tests/webserver/test_web_pages.py
@@ -29,6 +29,7 @@ class TestWebPage(ImpalaTestSuite):
   RESET_GLOG_LOGLEVEL_URL = "http://localhost:{0}/reset_glog_level"
   CATALOG_URL = "http://localhost:{0}/catalog"
   CATALOG_OBJECT_URL = "http://localhost:{0}/catalog_object"
+  TABLE_METRICS_URL = "http://localhost:{0}/table_metrics"
   QUERY_BACKENDS_URL = "http://localhost:{0}/query_backends"
   THREAD_GROUP_URL = "http://localhost:{0}/thread-group"
   # log4j changes do not apply to the statestore since it doesn't
@@ -37,6 +38,7 @@ class TestWebPage(ImpalaTestSuite):
   # one with it.
   TEST_PORTS_WITHOUT_SS = ["25000", "25020"]
   TEST_PORTS_WITH_SS = ["25000", "25010", "25020"]
+  CATALOG_TEST_PORT = ["25020"]
 
   def test_memz(self):
     """test /memz at impalad / statestored / catalogd"""
@@ -151,6 +153,8 @@ class TestWebPage(ImpalaTestSuite):
     self.__test_catalog_object("functional_parquet", "alltypes")
     self.__test_catalog_object("functional", "alltypesnopart")
     self.__test_catalog_object("functional_kudu", "alltypes")
+    self.__test_table_metrics("functional", "alltypes", "total-file-size-bytes")
+    self.__test_table_metrics("functional_kudu", "alltypes", "alter-duration")
 
   def __test_catalog_object(self, db_name, tbl_name):
     """Tests the /catalog_object endpoint for the given db/table. Runs
@@ -164,6 +168,11 @@ class TestWebPage(ImpalaTestSuite):
       "?object_type=TABLE&object_name=%s.%s" % (db_name, tbl_name), tbl_name,
       ports_to_test=self.TEST_PORTS_WITHOUT_SS)
 
+  def __test_table_metrics(self, db_name, tbl_name, metric):
+    self.client.execute("refresh %s.%s" % (db_name, tbl_name))
+    self.get_and_check_status(self.TABLE_METRICS_URL +
+      "?name=%s.%s" % (db_name, tbl_name), metric, ports_to_test=self.CATALOG_TEST_PORT)
+
   def test_query_details(self, unique_database):
     """Test that /query_backends returns the list of backend states for DML or queries;
     nothing for DDL statements"""

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/www/catalog.tmpl
----------------------------------------------------------------------
diff --git a/www/catalog.tmpl b/www/catalog.tmpl
index 5e271bf..ffe7c78 100644
--- a/www/catalog.tmpl
+++ b/www/catalog.tmpl
@@ -20,6 +20,87 @@ under the License.
 
 <h2>Catalog</h2>
 
+{{?has_large_tables}}
+<div class="panel panel-info">
+  <div class="panel-heading">
+      <h2 class="panel-title">
+      Top-{{num_large_tables}} Tables with Highest Memory Requirements
+      </h2>
+  </div>
+  <div class="panel-body">
+    <table id="large-tables" class='table table-hover table-bordered'>
+      <thead>
+        <tr>
+          <th>Name</th>
+          <th>Estimated memory</th>
+          <th>Metrics</th>
+        </tr>
+      </thead>
+      <tbody>
+        {{#large_tables}}
+        <tr>
+          <td><a href="catalog_object?object_type=TABLE&object_name={{name}}">{{name}}</a>
+          </td>
+          <td>{{mem_estimate}}</td>
+          <td><a href="table_metrics?name={{name}}">{{name}}-metrics</a></td>
+        </tr>
+        {{/large_tables}}
+      </tbody>
+    </table>
+  </div>
+</div>
+
+<script>
+    $(document).ready(function() {
+        $('#large-tables').DataTable({
+            "order": [[ 0, "desc" ]],
+            "pageLength": 10
+        });
+    });
+</script>
+{{/has_large_tables}}
+
+{{?has_frequent_tables}}
+<div class="panel panel-info">
+  <div class="panel-heading">
+      <h2 class="panel-title">
+      Top-{{num_frequent_tables}} Tables with Highest Number of Metadata Operations
+      </h2>
+  </div>
+  <div class="panel-body">
+    <table id="frequent-tables" class='table table-hover table-bordered'>
+      <thead>
+        <tr>
+          <th>Name</th>
+          <th>Metadata Operations (since loaded)</th>
+          <th>Metrics</th>
+        </tr>
+      </thead>
+      <tbody>
+        {{#frequent_tables}}
+        <tr>
+          <td><a href="catalog_object?object_type=TABLE&object_name={{name}}">{{name}}</a>
+          </td>
+          <td>{{num_metadata_ops}}</td>
+          <td><a href="table_metrics?name={{name}}">{{name}}-metrics</a></td>
+        </tr>
+        {{/frequent_tables}}
+      </tbody>
+    </table>
+  </div>
+</div>
+
+<script>
+    $(document).ready(function() {
+        $('#frequent-tables').DataTable({
+            "order": [[ 0, "desc" ]],
+            "pageLength": 10
+        });
+    });
+</script>
+{{/has_frequent_tables}}
+
+<h3>Databases</h3>
 <ol class="breadcrumb">
 {{#databases}}
 <li><a href='#{{name}}'>{{name}}</a></li>
@@ -36,16 +117,38 @@ under the License.
     </a>
   </div>
   <div class="panel-body">
-    <ul>
-      {{#tables}}
-      <li>
-        <a href="catalog_object?object_type=TABLE&object_name={{fqtn}}">{{name}}</a>
-      </li>
-      {{/tables}}
-    </ul>
+    <table id="{{name}}-tables" class='table table-hover table-bordered'>
+      <thead>
+        <tr>
+          <th>Name</th>
+          {{?has_metrics}}
+          <th>Metrics</th>
+          {{/has_metrics}}
+        </tr>
+      </thead>
+      <tbody>
+        {{#tables}}
+        <tr>
+          <td><a href="catalog_object?object_type=TABLE&object_name={{fqtn}}">{{name}}</a>
+          </td>
+          {{?has_metrics}}
+          <td><a href="table_metrics?name={{fqtn}}">{{name}}-metrics</a></td>
+          {{/has_metrics}}
+        </tr>
+        {{/tables}}
+      </tbody>
+    </table>
   </div>
 </div>
 
+<script>
+    $(document).ready(function() {
+        $('#{{name}}-tables').DataTable({
+            "pageLength": 5
+        });
+    });
+</script>
+
 {{/databases}}
 
 {{> www/common-footer.tmpl }}

http://git-wip-us.apache.org/repos/asf/impala/blob/3f00d10e/www/table_metrics.tmpl
----------------------------------------------------------------------
diff --git a/www/table_metrics.tmpl b/www/table_metrics.tmpl
new file mode 100644
index 0000000..5140309
--- /dev/null
+++ b/www/table_metrics.tmpl
@@ -0,0 +1,23 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+{{> www/common-header.tmpl }}
+
+<pre>{{table_metrics}}</pre>
+
+{{> www/common-footer.tmpl }}