You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/10/04 21:52:42 UTC

[2/4] impala git commit: IMPALA-7527: add fetch-from-catalogd cache info to profile

IMPALA-7527: add fetch-from-catalogd cache info to profile

Reapplies reverted IMPALA-7527. Adding a top-level entry to the profile
broke downstream consumers. The change here is to add the additional stats
to the summary profile.

This patch adds a Java wrapper for a RuntimeProfile object. The wrapper
supports some basic operations like non-hierarchical counters and
informational strings.

During planning, a profile is created, and passed back to the backend as
part of the ExecRequest. The backend then updates the query profile
based on the info emitted from the frontend.

This patch also adds the first use case for this profile information:
the CatalogdMetaProvider emits counters for cache hits, misses, and
fetch times, broken down by metadata category.

The emitted profile is a bit of a superset of the existing 'timeline'
functionality. However, it seems that some tools may parse the timeline
in its current location in the profile, so moving it might be
incompatible. I elected to leave that alone for now and just emit
counters in the new profile.

Change-Id: I419be157168cddb7521ea61e8f86733306b9315e
Reviewed-on: http://gerrit.cloudera.org:8080/11569
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e6bbe4ea
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e6bbe4ea
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e6bbe4ea

Branch: refs/heads/master
Commit: e6bbe4eaf5ba606ea3f4f1ed3360ecf9172a9ec3
Parents: ccec241
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Sep 4 19:07:12 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Oct 4 01:37:45 2018 +0000

----------------------------------------------------------------------
 be/src/service/client-request-state.cc          |  10 ++
 be/src/service/client-request-state.h           |   9 +
 be/src/service/impala-server.cc                 |   1 +
 be/src/util/runtime-profile.h                   |   2 +-
 common/thrift/Frontend.thrift                   |   4 +
 .../catalog/local/CatalogdMetaProvider.java     |  92 ++++++++++-
 .../org/apache/impala/service/Frontend.java     |  13 +-
 .../apache/impala/service/FrontendProfile.java  | 163 +++++++++++++++++++
 .../catalog/local/CatalogdMetaProviderTest.java |  20 +++
 tests/custom_cluster/test_local_catalog.py      |   5 +-
 10 files changed, 301 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e6bbe4ea/be/src/service/client-request-state.cc
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.cc b/be/src/service/client-request-state.cc
index ca871ab..c19c974 100644
--- a/be/src/service/client-request-state.cc
+++ b/be/src/service/client-request-state.cc
@@ -77,6 +77,7 @@ ClientRequestState::ClientRequestState(
     coord_exec_called_(false),
     // Profile is assigned name w/ id after planning
     profile_(RuntimeProfile::Create(&profile_pool_, "Query")),
+    frontend_profile_(RuntimeProfile::Create(&profile_pool_, "Frontend")),
     server_profile_(RuntimeProfile::Create(&profile_pool_, "ImpalaServer")),
     summary_profile_(RuntimeProfile::Create(&profile_pool_, "Summary")),
     frontend_(frontend),
@@ -118,6 +119,8 @@ ClientRequestState::ClientRequestState(
       "Sql Statement", query_ctx_.client_request.stmt);
   summary_profile_->AddInfoString("Coordinator",
       TNetworkAddressToString(exec_env->GetThriftBackendAddress()));
+
+  summary_profile_->AddChild(frontend_profile_);
 }
 
 ClientRequestState::~ClientRequestState() {
@@ -138,6 +141,13 @@ Status ClientRequestState::SetResultCache(QueryResultSet* cache,
   return Status::OK();
 }
 
+void ClientRequestState::SetFrontendProfile(TRuntimeProfileNode profile) {
+  // Should we defer creating and adding the child until here? probably.
+  TRuntimeProfileTree prof_tree;
+  prof_tree.nodes.emplace_back(std::move(profile));
+  frontend_profile_->Update(prof_tree);
+}
+
 Status ClientRequestState::Exec(TExecRequest* exec_request) {
   MarkActive();
   exec_request_ = *exec_request;

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bbe4ea/be/src/service/client-request-state.h
----------------------------------------------------------------------
diff --git a/be/src/service/client-request-state.h b/be/src/service/client-request-state.h
index 9442a0d..7ff5285 100644
--- a/be/src/service/client-request-state.h
+++ b/be/src/service/client-request-state.h
@@ -66,6 +66,11 @@ class ClientRequestState {
 
   ~ClientRequestState();
 
+  /// Sets the profile that is produced by the frontend. The frontend creates the
+  /// profile during planning and returns it to the backend via TExecRequest,
+  /// which then sets the frontend profile.
+  void SetFrontendProfile(TRuntimeProfileNode profile);
+
   /// Based on query type, this either initiates execution of a exec_request or submits
   /// the query to the Admission controller for asynchronous admission control. When this
   /// returns the operation state is either RUNNING_STATE or PENDING_STATE.
@@ -352,6 +357,9 @@ class ClientRequestState {
   /// The ClientRequestState builds three separate profiles.
   /// * profile_ is the top-level profile which houses the other
   ///   profiles, plus the query timeline
+  /// * frontend_profile_ is the profile emitted by the frontend
+  ///   during planning. Added to summary_profile_ so as to avoid
+  ///   breaking other tools that depend on the profile_ layout.
   /// * summary_profile_ contains mostly static information about the
   ///   query, including the query statement, the plan and the user who submitted it.
   /// * server_profile_ tracks time spent inside the ImpalaServer,
@@ -370,6 +378,7 @@ class ClientRequestState {
   /// - Query Status
   /// - Error logs
   RuntimeProfile* const profile_;
+  RuntimeProfile* const frontend_profile_;
   RuntimeProfile* const server_profile_;
   RuntimeProfile* const summary_profile_;
   RuntimeProfile::Counter* row_materialization_timer_;

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bbe4ea/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 649f1fc..af9414c 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -965,6 +965,7 @@ Status ImpalaServer::ExecuteInternal(
     (*request_state)->set_user_profile_access(result.user_has_profile_access);
     (*request_state)->summary_profile()->AddEventSequence(
         result.timeline.name, result.timeline);
+    (*request_state)->SetFrontendProfile(result.profile);
     if (result.__isset.result_set_metadata) {
       (*request_state)->set_result_metadata(result.result_set_metadata);
     }

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bbe4ea/be/src/util/runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index cae2462..a6b06ba 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -263,7 +263,7 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   Counter* inactive_timer() { return counter_map_[INACTIVE_TIME_COUNTER_NAME]; }
   int64_t local_time() { return local_time_ns_; }
 
-  /// Prints the counters in a name: value format.
+  /// Prints the contents of the profile in a name: value format.
   /// Does not hold locks when it makes any function calls.
   void PrettyPrint(std::ostream* s, const std::string& prefix="") const;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bbe4ea/common/thrift/Frontend.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/Frontend.thrift b/common/thrift/Frontend.thrift
index f8cd472..ffbfc07 100644
--- a/common/thrift/Frontend.thrift
+++ b/common/thrift/Frontend.thrift
@@ -634,6 +634,7 @@ struct TExecRequest {
   10: optional TSetQueryOptionRequest set_query_option_request
 
   // Timeline of planner's operation, for profiling
+  // TODO(todd): should integrate this with the 'profile' member instead.
   11: optional RuntimeProfile.TEventSequence timeline
 
   // If false, the user that runs this statement doesn't have access to the runtime
@@ -643,6 +644,9 @@ struct TExecRequest {
 
   // Set iff stmt_type is ADMIN_FN.
   13: optional TAdminRequest admin_request
+
+  // Profile information from the planning process.
+  14: optional RuntimeProfile.TRuntimeProfileNode profile
 }
 
 // Parameters to FeSupport.cacheJar().

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bbe4ea/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 2684c33..62f1d3e 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -48,6 +48,7 @@ import org.apache.impala.common.InternalException;
 import org.apache.impala.common.Pair;
 import org.apache.impala.common.Reference;
 import org.apache.impala.service.FeSupport;
+import org.apache.impala.service.FrontendProfile;
 import org.apache.impala.thrift.CatalogLookupStatus;
 import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TCatalogInfoSelector;
@@ -66,6 +67,7 @@ import org.apache.impala.thrift.TPartialPartitionInfo;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableInfoSelector;
 import org.apache.impala.thrift.TUniqueId;
+import org.apache.impala.thrift.TUnit;
 import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
 import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
 import org.apache.impala.util.ListMap;
@@ -81,6 +83,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
@@ -185,6 +188,25 @@ public class CatalogdMetaProvider implements MetaProvider {
    */
   private static final Object DB_LIST_CACHE_KEY = new Object();
 
+  private static final String CATALOG_FETCH_PREFIX = "CatalogFetch";
+  private static final String DB_LIST_STATS_CATEGORY = "DatabaseList";
+  private static final String DB_METADATA_STATS_CATEGORY = "Databases";
+  private static final String TABLE_NAMES_STATS_CATEGORY = "TableNames";
+  private static final String TABLE_METADATA_CACHE_CATEGORY = "Tables";
+  private static final String PARTITION_LIST_STATS_CATEGORY = "PartitionLists";
+  private static final String PARTITIONS_STATS_CATEGORY = "Partitions";
+  private static final String COLUMN_STATS_STATS_CATEGORY = "ColumnStats";
+  private static final String GLOBAL_CONFIGURATION_STATS_CATEGORY = "Config";
+  private static final String FUNCTION_LIST_STATS_CATEGORY = "FunctionLists";
+  private static final String FUNCTIONS_STATS_CATEGORY = "Functions";
+  private static final String RPC_STATS_CATEGORY = "RPCs";
+  private static final String RPC_REQUESTS =
+      CATALOG_FETCH_PREFIX + "." + RPC_STATS_CATEGORY + ".Requests";
+  private static final String RPC_BYTES =
+      CATALOG_FETCH_PREFIX + "." + RPC_STATS_CATEGORY + ".Bytes";
+  private static final String RPC_TIME =
+      CATALOG_FETCH_PREFIX + "." + RPC_STATS_CATEGORY + ".Time";
+
   /**
    * File descriptors store replicas using a compressed format that references hosts
    * by index in a "host index" list rather than by their full addresses. Since we cache
@@ -290,11 +312,20 @@ public class CatalogdMetaProvider implements MetaProvider {
       TGetPartialCatalogObjectRequest req)
       throws TException {
     TGetPartialCatalogObjectResponse resp;
-    byte[] ret;
+    byte[] ret = null;
+    Stopwatch sw = new Stopwatch().start();
     try {
       ret = FeSupport.GetPartialCatalogObject(new TSerializer().serialize(req));
     } catch (InternalException e) {
       throw new TException(e);
+    } finally {
+      sw.stop();
+      FrontendProfile profile = FrontendProfile.getCurrentOrNull();
+      if (profile != null) {
+        profile.addToCounter(RPC_REQUESTS, TUnit.NONE, 1);
+        profile.addToCounter(RPC_BYTES, TUnit.BYTES, ret == null ? 0 : ret.length);
+        profile.addToCounter(RPC_TIME, TUnit.TIME_MS, sw.elapsed(TimeUnit.MILLISECONDS));
+      }
     }
     resp = new TGetPartialCatalogObjectResponse();
     new TDeserializer().deserialize(resp, ret);
@@ -343,13 +374,15 @@ public class CatalogdMetaProvider implements MetaProvider {
 
   @SuppressWarnings("unchecked")
   private <CacheKeyType, ValueType> ValueType loadWithCaching(String itemString,
-      CacheKeyType key, final Callable<ValueType> loadCallable) throws TException {
+      String statsCategory, CacheKeyType key,
+      final Callable<ValueType> loadCallable) throws TException {
     // TODO(todd): there a race here if an invalidation comes in while we are
     // fetching here. Perhaps we need some locking, or need to remember the
     // version numbers of the invalidation messages and ensure that we don't
     // 'put' an element with a too-old version? See:
     // https://softwaremill.com/race-condition-cache-guava-caffeine/
     final Reference<Boolean> hit = new Reference<>(true);
+    Stopwatch sw = new Stopwatch().start();
     try {
       return (ValueType)cache_.get(key, new Callable<ValueType>() {
         @Override
@@ -362,13 +395,39 @@ public class CatalogdMetaProvider implements MetaProvider {
       Throwables.propagateIfPossible(e.getCause(), TException.class);
       throw new RuntimeException(e);
     } finally {
+      sw.stop();
+      addStatsToProfile(statsCategory, /*numHits=*/hit.getRef() ? 1 : 0,
+          /*numMisses=*/hit.getRef() ? 0 : 1, sw);
       LOG.trace("Request for {}: {}", itemString, hit.getRef() ? "hit" : "miss");
     }
   }
 
+  /**
+   * Adds basic statistics to the query's profile when accessing cache entries.
+   * For each cache request, the number of hits, misses, and elapsed time is aggregated.
+   * Cache requests for different types of cache entries, such as function names vs.
+   * table names, are differentiated by a 'statsCategory'.
+   */
+  private void addStatsToProfile(String statsCategory, int numHits, int numMisses,
+      Stopwatch stopwatch) {
+    FrontendProfile profile = FrontendProfile.getCurrentOrNull();
+    if (profile == null) return;
+    final String prefix = CATALOG_FETCH_PREFIX + "." +
+        Preconditions.checkNotNull(statsCategory) + ".";
+    profile.addToCounter(prefix + "Requests", TUnit.NONE, numHits + numMisses);;
+    profile.addToCounter(prefix + "Time", TUnit.TIME_MS,
+        stopwatch.elapsed(TimeUnit.MILLISECONDS));
+    if (numHits > 0) {
+      profile.addToCounter(prefix + "Hits", TUnit.NONE, numHits);
+    }
+    if (numMisses > 0) {
+      profile.addToCounter(prefix + "Misses", TUnit.NONE, numMisses);
+    }
+  }
+
   @Override
   public ImmutableList<String> loadDbList() throws TException {
-    return loadWithCaching("database list", DB_LIST_CACHE_KEY,
+    return loadWithCaching("database list", DB_LIST_STATS_CATEGORY, DB_LIST_CACHE_KEY,
         new Callable<ImmutableList<String>>() {
           @Override
           public ImmutableList<String> call() throws Exception {
@@ -415,6 +474,7 @@ public class CatalogdMetaProvider implements MetaProvider {
   @Override
   public Database loadDb(final String dbName) throws TException {
     return loadWithCaching("database metadata for " + dbName,
+        DB_METADATA_STATS_CATEGORY,
         new DbCacheKey(dbName, DbCacheKey.DbInfoType.HMS_METADATA),
         new Callable<Database>() {
           @Override
@@ -433,6 +493,7 @@ public class CatalogdMetaProvider implements MetaProvider {
   public ImmutableList<String> loadTableNames(final String dbName)
       throws MetaException, UnknownDBException, TException {
     return loadWithCaching("table names for database " + dbName,
+        TABLE_NAMES_STATS_CATEGORY,
         new DbCacheKey(dbName, DbCacheKey.DbInfoType.TABLE_NAMES),
         new Callable<ImmutableList<String>>() {
           @Override
@@ -474,6 +535,7 @@ public class CatalogdMetaProvider implements MetaProvider {
     TableCacheKey cacheKey = new TableCacheKey(dbName, tableName);
     TableMetaRefImpl ref = loadWithCaching(
         "table metadata for " + dbName + "." + tableName,
+        TABLE_METADATA_CACHE_CATEGORY,
         cacheKey,
         new Callable<TableMetaRefImpl>() {
           @Override
@@ -493,6 +555,7 @@ public class CatalogdMetaProvider implements MetaProvider {
   @Override
   public List<ColumnStatisticsObj> loadTableColumnStatistics(final TableMetaRef table,
       List<String> colNames) throws TException {
+    Stopwatch sw = new Stopwatch().start();
     List<ColumnStatisticsObj> ret = Lists.newArrayListWithCapacity(colNames.size());
     // Look up in cache first, keeping track of which ones are missing.
     // We can't use 'loadWithCaching' since we need to fetch several entries batched
@@ -534,6 +597,9 @@ public class CatalogdMetaProvider implements MetaProvider {
             NEGATIVE_COLUMN_STATS_SENTINEL);
       }
     }
+    sw.stop();
+    addStatsToProfile(COLUMN_STATS_STATS_CATEGORY,
+        hitCount + negativeHitCount, missingCols.size(), sw);
     LOG.trace("Request for column stats of {}: hit {}/ neg hit {} / miss {}",
         table, hitCount, negativeHitCount, missingCols.size());
     return ret;
@@ -544,8 +610,8 @@ public class CatalogdMetaProvider implements MetaProvider {
   public List<PartitionRef> loadPartitionList(final TableMetaRef table)
       throws TException {
     PartitionListCacheKey key = new PartitionListCacheKey((TableMetaRefImpl) table);
-    return (List<PartitionRef>) loadWithCaching(
-        "partition list for " + table, key, new Callable<List<PartitionRef>>() {
+    return (List<PartitionRef>) loadWithCaching("partition list for " + table,
+        PARTITION_LIST_STATS_CATEGORY, key, new Callable<List<PartitionRef>>() {
           /** Called to load cache for cache misses */
           @Override
           public List<PartitionRef> call() throws Exception {
@@ -574,13 +640,13 @@ public class CatalogdMetaProvider implements MetaProvider {
       throws MetaException, TException {
     Preconditions.checkArgument(table instanceof TableMetaRefImpl);
     TableMetaRefImpl refImpl = (TableMetaRefImpl)table;
-
+    Stopwatch sw = new Stopwatch().start();
     // Load what we can from the cache.
     Map<PartitionRef, PartitionMetadata> refToMeta = loadPartitionsFromCache(refImpl,
         hostIndex, partitionRefs);
 
-    LOG.trace("Request for partitions of {}: hit {}/{}", table, refToMeta.size(),
-        partitionRefs.size());
+    final int numHits = refToMeta.size();
+    final int numMisses = partitionRefs.size() - numHits;
 
     // Load the remainder from the catalogd.
     List<PartitionRef> missingRefs = Lists.newArrayList();
@@ -594,6 +660,10 @@ public class CatalogdMetaProvider implements MetaProvider {
       // Write back to the cache.
       storePartitionsInCache(refImpl, hostIndex, fromCatalogd);
     }
+    sw.stop();
+    addStatsToProfile(PARTITIONS_STATS_CATEGORY, refToMeta.size(), numMisses, sw);
+    LOG.trace("Request for partitions of {}: hit {}/{}", table, refToMeta.size(),
+        partitionRefs.size());
 
     // Convert the returned map to be by-name instead of by-ref.
     Map<String, PartitionMetadata> nameToMeta = Maps.newHashMapWithExpectedSize(
@@ -724,7 +794,9 @@ public class CatalogdMetaProvider implements MetaProvider {
   @Override
   public String loadNullPartitionKeyValue() throws MetaException, TException {
     return (String) loadWithCaching("null partition key value",
-        NULL_PARTITION_KEY_VALUE_CACHE_KEY, new Callable<String>() {
+        GLOBAL_CONFIGURATION_STATS_CATEGORY,
+        NULL_PARTITION_KEY_VALUE_CACHE_KEY,
+        new Callable<String>() {
           /** Called to load cache for cache misses */
           @Override
           public String call() throws Exception {
@@ -736,6 +808,7 @@ public class CatalogdMetaProvider implements MetaProvider {
   @Override
   public List<String> loadFunctionNames(final String dbName) throws TException {
     return loadWithCaching("function names for database " + dbName,
+        FUNCTION_LIST_STATS_CATEGORY,
         new DbCacheKey(dbName, DbCacheKey.DbInfoType.FUNCTION_NAMES),
         new Callable<ImmutableList<String>>() {
           @Override
@@ -755,6 +828,7 @@ public class CatalogdMetaProvider implements MetaProvider {
       final String functionName) throws TException {
     ImmutableList<TFunction> thriftFuncs = loadWithCaching(
         "function " + dbName + "." + functionName,
+        FUNCTIONS_STATS_CATEGORY,
         new FunctionsCacheKey(dbName, functionName),
         new Callable<ImmutableList<TFunction>>() {
           @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bbe4ea/fe/src/main/java/org/apache/impala/service/Frontend.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 39983ea..d31b573 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1061,11 +1061,14 @@ public class Frontend {
       throws ImpalaException {
     // Timeline of important events in the planning process, used for debugging
     // and profiling.
-    EventSequence timeline = new EventSequence("Query Compilation");
-    TExecRequest result = getTExecRequest(queryCtx, timeline, explainString);
-    timeline.markEvent("Planning finished");
-    result.setTimeline(timeline.toThrift());
-    return result;
+    try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) {
+      EventSequence timeline = new EventSequence("Query Compilation");
+      TExecRequest result = getTExecRequest(queryCtx, timeline, explainString);
+      timeline.markEvent("Planning finished");
+      result.setTimeline(timeline.toThrift());
+      result.setProfile(FrontendProfile.getCurrent().emitAsThrift());
+      return result;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bbe4ea/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/FrontendProfile.java b/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
new file mode 100644
index 0000000..3344bf6
--- /dev/null
+++ b/fe/src/main/java/org/apache/impala/service/FrontendProfile.java
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.impala.service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import org.apache.impala.thrift.TCounter;
+import org.apache.impala.thrift.TRuntimeProfileNode;
+import org.apache.impala.thrift.TUnit;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.errorprone.annotations.concurrent.GuardedBy;
+
+/**
+ * Wrapper class for creating a runtime profile within the frontend.
+ *
+ * In order to avoid plumbing an object through all code that might want to emit counters
+ * into the profile, this class provides some support for storing a current profile in
+ * a thread-local variable.
+ *
+ * This class is thread-safe.
+ */
+@ThreadSafe
+public class FrontendProfile {
+  private static final String ROOT_COUNTER_NAME = "";
+
+  private static ThreadLocal<FrontendProfile> THREAD_LOCAL =
+      new ThreadLocal<>();
+
+  @GuardedBy("this")
+  private TRuntimeProfileNode profile_;
+
+  /**
+   * Name-based access to the counters in the profile_.counters List<TCounter>.
+   */
+  @GuardedBy("this")
+  private final Map<String, TCounter> countersByName_ = new HashMap<>();
+
+  FrontendProfile() {
+    profile_ = new TRuntimeProfileNode("Frontend",
+        /*num_children=*/ 0,
+        /*counters=*/new ArrayList<>(),
+        /*metadata=*/-1L, // TODO(todd) what is this used for? why is it required?
+        /*indent=*/false,
+        /*info_strings=*/new HashMap<>(),
+        /*info_strings_display_order*/new ArrayList<>(),
+        /*child_counters_map=*/ImmutableMap.of(ROOT_COUNTER_NAME, new HashSet<>()));
+  }
+
+  /**
+   * Create a new profile, setting it as the current thread-local profile for the
+   * length of the current scope. This is meant to be used in a try-with-resources
+   * statement. Supports at most one scope per thread. No nested scopes are currently
+   * allowed.
+   */
+  public static Scope createNewWithScope() {
+    return new Scope(new FrontendProfile());
+  }
+
+  /**
+   * Get the profile attached to the current thread, throw IllegalStateException if there
+   * is none.
+   */
+  @Nonnull
+  public static FrontendProfile getCurrent() {
+    FrontendProfile prof = THREAD_LOCAL.get();
+    Preconditions.checkState(prof != null, "no profile in scope");
+    return prof;
+  }
+
+  /**
+   * Get the profile attached to the current thread, or null if there is no current
+   * profile.
+   */
+  @Nullable
+  public static FrontendProfile getCurrentOrNull() {
+    return THREAD_LOCAL.get();
+  }
+
+  /**
+   * Return the profile in Thrift format. This may be called only once, and after it is
+   * called, no further methods may be used on this PlannerProfile object. Any attempts
+   * to do so will result in IllegalStateExceptions.
+   */
+  public synchronized TRuntimeProfileNode emitAsThrift() {
+    Preconditions.checkState(profile_ != null, "already emitted profile");
+    TRuntimeProfileNode ret = profile_;
+    profile_ = null;
+    return ret;
+  }
+
+  /**
+   * Add an informational key/value string pair to the profile. These are written out
+   * as is to the user. Subsequent calls with the same key will overwrite previous ones.
+   */
+  public synchronized void addInfoString(String key, String val) {
+    Preconditions.checkState(profile_ != null, "already emitted profile");
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(val);
+    if (profile_.getInfo_strings().put(key, val) == null) {
+      // If it's a new info string instead of replacing an existing one,
+      // we need to also include it in the 'ordering' list.
+      profile_.getInfo_strings_display_order().add(key);
+    }
+  }
+
+  /**
+   * Add 'delta' to the counter with the given name and unit. Counters are created
+   * on-demand.
+   */
+  public synchronized void addToCounter(String name, TUnit unit, long delta) {
+    Preconditions.checkState(profile_ != null, "already emitted profile");
+    TCounter counter = countersByName_.get(Preconditions.checkNotNull(name));
+    if (counter == null) {
+      // Need to create the counter.
+      counter = new TCounter(name, unit, 0);
+      countersByName_.put(name, counter);
+      profile_.counters.add(counter);
+      // Currently we don't support hierarchical counters in the frontend.
+      profile_.child_counters_map.get(ROOT_COUNTER_NAME).add(name);
+    }
+    counter.value += delta;
+  }
+
+
+  public static class Scope implements AutoCloseable {
+    private final FrontendProfile oldThreadLocalValue_;
+
+    private Scope(FrontendProfile profile) {
+      oldThreadLocalValue_ = THREAD_LOCAL.get();
+      // TODO: remove when allowing nested scopes.
+      Preconditions.checkState(oldThreadLocalValue_ == null);
+      THREAD_LOCAL.set(profile);
+    }
+
+    @Override
+    public void close() {
+      THREAD_LOCAL.set(oldThreadLocalValue_);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bbe4ea/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
----------------------------------------------------------------------
diff --git a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
index 74264c6..de6dd07 100644
--- a/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/local/CatalogdMetaProviderTest.java
@@ -19,6 +19,7 @@ package org.apache.impala.catalog.local;
 
 import static org.junit.Assert.*;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -31,11 +32,13 @@ import org.apache.impala.catalog.local.MetaProvider.PartitionRef;
 import org.apache.impala.catalog.local.MetaProvider.TableMetaRef;
 import org.apache.impala.common.Pair;
 import org.apache.impala.service.FeSupport;
+import org.apache.impala.service.FrontendProfile;
 import org.apache.impala.thrift.TBackendGflags;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TDatabase;
 import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TRuntimeProfileNode;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.util.ListMap;
 import org.junit.Test;
@@ -221,4 +224,21 @@ public class CatalogdMetaProviderTest {
     assertEquals(0, stats.hitCount());
     assertEquals(1, stats.missCount());
   }
+
+  @Test
+  public void testProfile() throws Exception {
+    FrontendProfile profile;
+    try (FrontendProfile.Scope scope = FrontendProfile.createNewWithScope()) {
+      provider_.loadTable("functional", "alltypes");
+      profile = FrontendProfile.getCurrent();
+    }
+    TRuntimeProfileNode prof = profile.emitAsThrift();
+    assertEquals(3, prof.counters.size());
+    Collections.sort(prof.counters);
+    assertEquals("TCounter(name:CatalogFetch.Tables.Hits, unit:NONE, value:1)",
+        prof.counters.get(0).toString());
+    assertEquals("TCounter(name:CatalogFetch.Tables.Requests, unit:NONE, value:1)",
+        prof.counters.get(1).toString());
+    assertEquals("CatalogFetch.Tables.Time", prof.counters.get(2).name);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/impala/blob/e6bbe4ea/tests/custom_cluster/test_local_catalog.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_local_catalog.py b/tests/custom_cluster/test_local_catalog.py
index b78ef88..14a9a54 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -306,9 +306,8 @@ class TestCompactCatalogUpdates(CustomClusterTestSuite):
       for _ in xrange(0, 10):
         for query in queries_to_test:
           ret = self.execute_query_expect_success(client, query)
-          # TODO: re-enable checks when counters are put back into profile
-          # assert ret.runtime_profile.count("Frontend:") == 1
-          # assert ret.runtime_profile.count("CatalogFetch") > 1
+          assert ret.runtime_profile.count("Frontend:") == 1
+          assert ret.runtime_profile.count("CatalogFetch") > 1
           cache_metrics = self.get_catalog_cache_metrics(impalad)
           cache_hit_rate = cache_metrics[cache_hit_rate_metric_key]
           cache_miss_rate = cache_metrics[cache_miss_rate_metric_key]