You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/02/25 21:03:36 UTC

[impala] branch master updated: IMPALA-9381: on-demand conversion of runtime profile

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bd45d2  IMPALA-9381: on-demand conversion of runtime profile
1bd45d2 is described below

commit 1bd45d295ebfc3f526a98eebb9b61525b9332c91
Author: Tim Armstrong <ta...@cloudera.com>
AuthorDate: Mon Feb 17 10:19:41 2020 -0800

    IMPALA-9381: on-demand conversion of runtime profile
    
    Converting the runtime profile to JSON and text representations
    at the end of the query used significant CPU and time. These
    representations will commonly never be accessed, because
    they need to be explicitly requested by a client via the
    HTTP debug interface or via a thrift profile request.
    So it is a waste of resources to eagerly convert them, and
    in particular it is a bad idea to do so on the critical path
    of a query.
    
    This commit switches to generating alternative profile
    representations on-demand. Only the compressed thrift version
    of the profile is stored in QueryStateRecord. This is the
    most compact representation of the profile and it is
    relatively convenient to convert into other formats.
    
    Also use a move() when constructing QueryStateRecord to avoid
    copying the profile unnecessarily.
    
    Fix a couple of potential use-after-free issues where Json
    objects generated by RuntimeProfile::ToJson() could reference
    strings owned by the object pool. These were detected by
    running an ASAN build, because after this change, the temporary
    object pool used to hold the deserialized profile was freed before
    the JSON tree was returned.
    
    The "kind" field of counters is removed from the JSON profile.
    This couldn't be round-tripped correctly through thrift, and
    probably isn't necessary. It also helps slim down the profiles.
    
    Also make sure to preserve the "indent" field when round-tripping
    to thrift.
    
    Testing:
    Ran core tests.
    
    Diffed JSON and text profiles download from web UI from before and
    after to make sure there were no unexpected changes as a result
    of the round-trip via thrift.
    
    Change-Id: Ic2f5133cc146adc3b044cf4b64aae0a9688449fa
    Reviewed-on: http://gerrit.cloudera.org:8080/15236
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/impala-http-handler.cc  |   5 +-
 be/src/service/impala-http-handler.h   |   6 +-
 be/src/service/impala-server.cc        |  91 ++++++++++++----------------
 be/src/service/impala-server.h         |  41 +++++++------
 be/src/util/runtime-profile-counters.h |  20 ------
 be/src/util/runtime-profile-test.cc    |  17 ++++--
 be/src/util/runtime-profile.cc         | 107 +++++++++++++++++++--------------
 be/src/util/runtime-profile.h          |  20 ++++--
 8 files changed, 159 insertions(+), 148 deletions(-)

diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index 80f5427..5d2e7da 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -480,9 +480,10 @@ void ImpalaHttpHandler::QueryStateHandler(const Webserver::WebRequest& req,
   Value completed_queries(kArrayType);
   {
     lock_guard<mutex> l(server_->query_log_lock_);
-    for (const ImpalaServer::QueryStateRecord& log_entry: server_->query_log_) {
+    for (const unique_ptr<ImpalaServer::QueryStateRecord>& log_entry :
+        server_->query_log_) {
       Value record_json(kObjectType);
-      QueryStateToJson(log_entry, &record_json, document);
+      QueryStateToJson(*log_entry, &record_json, document);
       completed_queries.PushBack(record_json, document->GetAllocator());
     }
   }
diff --git a/be/src/service/impala-http-handler.h b/be/src/service/impala-http-handler.h
index b45bec9..8ecb7a1 100644
--- a/be/src/service/impala-http-handler.h
+++ b/be/src/service/impala-http-handler.h
@@ -79,8 +79,10 @@ class ImpalaHttpHandler {
   void QueryStateHandler(const Webserver::WebRequest& req,
       rapidjson::Document* document);
 
-  /// Json callback for /query_profile. Expects query_id as an argument, produces Json
-  /// with 'profile' set to the profile string, and 'query_id' set to the query ID.
+  /// Json callback for /query_profile. Expects query_id as an argument. If a json
+  /// profile is requested, the JSON profile is returned in 'document' under
+  /// "contents". Otherwise 'document' has 'profile' set to the profile string,
+  /// and 'query_id' set to the query ID.
   void QueryProfileHandler(const Webserver::WebRequest& req,
       rapidjson::Document* document);
 
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 13bd840..83dbea9 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -48,6 +48,7 @@
 #include "catalog/catalog-util.h"
 #include "common/compiler-util.h"
 #include "common/logging.h"
+#include "common/object-pool.h"
 #include "common/thread-debug-info.h"
 #include "common/version.h"
 #include "exec/external-data-source-executor.h"
@@ -68,12 +69,13 @@
 #include "runtime/tmp-file-mgr.h"
 #include "scheduling/admission-controller.h"
 #include "service/cancellation-work.h"
-#include "service/impala-http-handler.h"
-#include "service/impala-internal-service.h"
 #include "service/client-request-state.h"
 #include "service/frontend.h"
+#include "service/impala-http-handler.h"
+#include "service/impala-internal-service.h"
 #include "util/auth-util.h"
 #include "util/bit-util.h"
+#include "util/coding-util.h"
 #include "util/debug-util.h"
 #include "util/error-util.h"
 #include "util/histogram-metric.h"
@@ -90,8 +92,8 @@
 #include "util/string-parser.h"
 #include "util/summary-util.h"
 #include "util/test-info.h"
-#include "util/uid-util.h"
 #include "util/time.h"
+#include "util/uid-util.h"
 
 #include "gen-cpp/Types_types.h"
 #include "gen-cpp/ImpalaService.h"
@@ -661,24 +663,23 @@ Status ImpalaServer::GetRuntimeProfileOutput(const TUniqueId& query_id,
     RETURN_IF_ERROR(CheckProfileAccess(user, query_record->second->effective_user,
         query_record->second->user_has_profile_access));
     if (format == TRuntimeProfileFormat::BASE64) {
-      (*output) << query_record->second->encoded_profile_str;
+      Base64Encode(query_record->second->compressed_profile, output);
     } else if (format == TRuntimeProfileFormat::THRIFT) {
-      RETURN_IF_ERROR(RuntimeProfile::DeserializeFromArchiveString(
-          query_record->second->encoded_profile_str, thrift_output));
+      RETURN_IF_ERROR(RuntimeProfile::DecompressToThrift(
+          query_record->second->compressed_profile, thrift_output));
     } else if (format == TRuntimeProfileFormat::JSON) {
-      ParseResult parse_ok = json_output->Parse(
-          query_record->second->json_profile_str.c_str());
-      // When there is an error, the json_output will stay unchanged
-      // based on rapidjson parse API
-      if (!parse_ok){
-        string err = strings::Substitute("JSON parse error: $0 (Offset: $1)",
-            GetParseError_En(parse_ok.Code()), parse_ok.Offset());
-        VLOG(1) << err;
-        return Status::Expected(err);
-      }
+      ObjectPool tmp_pool;
+      RuntimeProfile* tmp_profile;
+      RETURN_IF_ERROR(RuntimeProfile::DecompressToProfile(
+          query_record->second->compressed_profile, &tmp_pool, &tmp_profile));
+      tmp_profile->ToJson(json_output);
     } else {
       DCHECK_EQ(format, TRuntimeProfileFormat::STRING);
-      (*output) << query_record->second->profile_str;
+      ObjectPool tmp_pool;
+      RuntimeProfile* tmp_profile;
+      RETURN_IF_ERROR(RuntimeProfile::DecompressToProfile(
+          query_record->second->compressed_profile, &tmp_pool, &tmp_profile));
+      tmp_profile->PrettyPrint(output);
     }
   }
   return Status::OK();
@@ -790,8 +791,8 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
 }
 
 void ImpalaServer::ArchiveQuery(const ClientRequestState& query) {
-  string encoded_profile_str;
-  Status status = query.profile()->SerializeToArchiveString(&encoded_profile_str);
+  vector<uint8_t> compressed_profile;
+  Status status = query.profile()->Compress(&compressed_profile);
   if (!status.ok()) {
     // Didn't serialize the string. Continue with empty string.
     LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string "
@@ -803,7 +804,8 @@ void ImpalaServer::ArchiveQuery(const ClientRequestState& query) {
   // FLAGS_log_query_to_file will have been set to false
   if (FLAGS_log_query_to_file) {
     stringstream ss;
-    ss << UnixMillis() << " " << PrintId(query.query_id()) << " " << encoded_profile_str;
+    ss << UnixMillis() << " " << PrintId(query.query_id()) << " ";
+    Base64Encode(compressed_profile, &ss);
     status = profile_logger_->AppendEntry(ss.str());
     if (!status.ok()) {
       LOG_EVERY_N(WARNING, 1000) << "Could not write to profile log file file ("
@@ -815,17 +817,19 @@ void ImpalaServer::ArchiveQuery(const ClientRequestState& query) {
   }
 
   if (FLAGS_query_log_size == 0) return;
-  QueryStateRecord record(query, true, encoded_profile_str);
+  unique_ptr<QueryStateRecord> record =
+      make_unique<QueryStateRecord>(query, move(compressed_profile));
   if (query.GetCoordinator() != nullptr)
-    query.GetCoordinator()->GetTExecSummary(&record.exec_summary);
+    query.GetCoordinator()->GetTExecSummary(&record->exec_summary);
   {
     lock_guard<mutex> l(query_log_lock_);
     // Add record to the beginning of the log, and to the lookup index.
-    query_log_index_[query.query_id()] = query_log_.insert(query_log_.begin(), record);
+    query_log_index_[query.query_id()] = record.get();
+    query_log_.insert(query_log_.begin(), move(record));
 
     if (FLAGS_query_log_size > -1 && FLAGS_query_log_size < query_log_.size()) {
       DCHECK_EQ(query_log_.size() - FLAGS_query_log_size, 1);
-      query_log_index_.erase(query_log_.back().id);
+      query_log_index_.erase(query_log_.back()->id);
       query_log_.pop_back();
     }
   }
@@ -1810,8 +1814,18 @@ void ImpalaServer::BuildLocalBackendDescriptorInternal(TBackendDescriptor* be_de
   be_desc->executor_groups = GetExecutorGroups(FLAGS_executor_groups);
 }
 
-ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& request_state,
-    bool copy_profile, const string& encoded_profile) {
+ImpalaServer::QueryStateRecord::QueryStateRecord(
+    const ClientRequestState& request_state, vector<uint8_t>&& compressed_profile)
+  : compressed_profile(compressed_profile) {
+  Init(request_state);
+}
+
+ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& request_state)
+  : compressed_profile() {
+  Init(request_state);
+}
+
+void ImpalaServer::QueryStateRecord::Init(const ClientRequestState& request_state) {
   id = request_state.query_id();
   const TExecRequest& request = request_state.exec_request();
 
@@ -1837,31 +1851,6 @@ ImpalaServer::QueryStateRecord::QueryStateRecord(const ClientRequestState& reque
 
   request_state.query_events()->ToThrift(&event_sequence);
 
-  if (copy_profile) {
-    stringstream ss;
-    request_state.profile()->PrettyPrint(&ss);
-    profile_str = ss.str();
-
-    Document json_profile(rapidjson::kObjectType);
-    request_state.profile()->ToJson(&json_profile);
-
-    StringBuffer buffer;
-    Writer<StringBuffer> writer(buffer);
-    json_profile.Accept(writer);
-    json_profile_str = buffer.GetString();
-
-    if (encoded_profile.empty()) {
-      Status status =
-          request_state.profile()->SerializeToArchiveString(&encoded_profile_str);
-      if (!status.ok()) {
-        LOG_EVERY_N(WARNING, 1000) << "Could not serialize profile to archive string "
-                                   << status.GetDetail();
-      }
-    } else {
-      encoded_profile_str = encoded_profile;
-    }
-  }
-
   // Save the query fragments so that the plan can be visualised.
   for (const TPlanExecInfo& plan_exec_info:
       request_state.exec_request().query_exec_request.plan_exec_info) {
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 3bf95d5..3623e7c 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -812,16 +812,13 @@ class ImpalaServer : public ImpalaServiceIf,
   /// Initializes the backend descriptor in 'be_desc' with the local backend information.
   void BuildLocalBackendDescriptorInternal(TBackendDescriptor* be_desc);
 
-  /// Snapshot of a query's state, archived in the query log.
+  /// Snapshot of a query's state, archived in the query log. Not mutated after
+  /// construction.
   struct QueryStateRecord {
-    /// Pretty-printed runtime profile. TODO: Copy actual profile object
-    std::string profile_str;
-
-    /// Base64 encoded runtime profile
-    std::string encoded_profile_str;
-
-    /// JSON based runtime profile
-    std::string json_profile_str;
+    /// Compressed representation of profile returned by RuntimeProfile::Compress().
+    /// Must be initialised to a valid value if this is a completed query.
+    /// Empty if this was initialised from a running query.
+    const std::vector<uint8_t> compressed_profile;
 
     /// Query id
     TUniqueId id;
@@ -890,16 +887,20 @@ class ImpalaServer : public ImpalaServiceIf,
     /// string if this request doesn't go through admission control.
     std::string resource_pool;
 
-    /// Initialise from an exec_state. If copy_profile is true, print the query
-    /// profile to a string and copy that into this.profile (which is expensive),
-    /// otherwise leave this.profile empty.
-    /// If encoded_str is non-empty, it is the base64 encoded string for
-    /// exec_state->profile.
-    QueryStateRecord(const ClientRequestState& exec_state, bool copy_profile = false,
-        const std::string& encoded_str = "");
+    /// Initialise from 'exec_state' of a completed query. 'compressed_profile' must be
+    /// a runtime profile decompressed with RuntimeProfile::Compress().
+    QueryStateRecord(
+        const ClientRequestState& exec_state, std::vector<uint8_t>&& compressed_profile);
+
+    /// Initialize from 'exec_state' of a running query
+    QueryStateRecord(const ClientRequestState& exec_state);
 
     /// Default constructor used only when participating in collections
     QueryStateRecord() { }
+
+   private:
+    // Common initialization for constructors.
+    void Init(const ClientRequestState& exec_state);
   };
 
   struct QueryStateRecordLessThan {
@@ -1037,11 +1038,13 @@ class ImpalaServer : public ImpalaServiceIf,
   std::mutex query_log_lock_;
 
   /// FIFO list of query records, which are written after the query finishes executing
-  typedef std::list<QueryStateRecord> QueryLog;
+  typedef std::list<std::unique_ptr<QueryStateRecord>> QueryLog;
   QueryLog query_log_;
 
-  /// Index that allows lookup via TUniqueId into the query log
-  typedef boost::unordered_map<TUniqueId, QueryLog::iterator> QueryLogIndex;
+  /// Index that allows lookup via TUniqueId into the query log. The QueryStateRecord
+  /// value is owned by 'query_log_' so the entry in this index must be removed when
+  /// it is removed from 'query_log_'.
+  typedef boost::unordered_map<TUniqueId, QueryStateRecord*> QueryLogIndex;
   QueryLogIndex query_log_index_;
 
   /// Logger for writing encoded query profiles, one per line with the following format:
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 398ef20..21a586a 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -350,10 +350,6 @@ class RuntimeProfile::HighWaterMarkCounter : public RuntimeProfile::Counter {
     UpdateMax(v);
   }
 
-  std::string CounterType() const override {
-    return "HighWaterMarkCounter";
-  }
-
   int64_t current_value() const { return current_value_.Load(); }
 
  private:
@@ -385,10 +381,6 @@ class RuntimeProfile::DerivedCounter : public RuntimeProfile::Counter {
     return counter_fn_();
   }
 
-  std::string CounterType() const override {
-    return "DerivedCounter";
-  }
-
  private:
   SampleFunction counter_fn_;
 };
@@ -433,10 +425,6 @@ class RuntimeProfile::AveragedCounter : public RuntimeProfile::Counter {
     }
   }
 
-  std::string CounterType() const override {
-    return "AveragedCounter";
-  }
-
   /// The value for this counter should be updated through UpdateCounter().
   /// Set() and Add() should not be used.
   void Set(double value) override { DCHECK(false); }
@@ -506,10 +494,6 @@ class RuntimeProfile::SummaryStatsCounter : public RuntimeProfile::Counter {
     val->AddMember("num_of_samples", total_num_values_, document.GetAllocator());
   }
 
-  std::string CounterType() const override {
-    return "SummaryStatsCounter";
-  }
-
  private:
   /// The total number of values seen so far.
   int32_t total_num_values_;
@@ -812,10 +796,6 @@ class RuntimeProfile::ConcurrentTimerCounter : public Counter {
     DCHECK(false);
   }
 
-  std::string CounterType() const override {
-    return "ConcurrentTimerCounter";
-  }
-
  private:
   ConcurrentStopWatch csw_;
 };
diff --git a/be/src/util/runtime-profile-test.cc b/be/src/util/runtime-profile-test.cc
index 5a3c7f5..dcb5bf7 100644
--- a/be/src/util/runtime-profile-test.cc
+++ b/be/src/util/runtime-profile-test.cc
@@ -98,6 +98,18 @@ TEST(CountersTest, Basic) {
   deserialized_profile->GetExecSummary(&exec_summary_result);
   EXPECT_EQ(exec_summary_result.status, status);
 
+  // Serialize/deserialize to compressed binary
+  vector<uint8_t> compressed;
+  EXPECT_OK(profile_a->Compress(&compressed));
+  RuntimeProfile* deserialized_profile2;
+  EXPECT_OK(
+      RuntimeProfile::DecompressToProfile(compressed, &pool, &deserialized_profile2));
+  counter_merged = deserialized_profile2->GetCounter("A");
+  EXPECT_EQ(counter_merged->value(), 1);
+  EXPECT_TRUE(deserialized_profile2->GetCounter("Not there") == nullptr);
+  deserialized_profile2->GetExecSummary(&exec_summary_result);
+  EXPECT_EQ(exec_summary_result.status, status);
+
   // Averaged
   RuntimeProfile* averaged_profile = RuntimeProfile::Create(&pool, "Merged", true);
   averaged_profile->UpdateAverage(from_thrift);
@@ -1214,14 +1226,12 @@ TEST(ToJson, RuntimeProfileToJsonTest) {
     if (itr["counter_name"] == "A") {
       EXPECT_EQ(1, itr["value"].GetInt());
       EXPECT_EQ("UNIT", itr["unit"]);
-      EXPECT_EQ("Counter", itr["kind"]);
     }// check HighWaterMarkCounter
     else if (itr["counter_name"] == "high_water_counter") {
       EXPECT_EQ(20, itr["value"].GetInt());
       EXPECT_EQ("BYTES", itr["unit"]);
-      EXPECT_EQ("HighWaterMarkCounter", itr["kind"]);
     } else {
-      DCHECK(false);
+      EXPECT_TRUE(false) << itr["counter_name"].GetString();
     }
   }
 
@@ -1234,7 +1244,6 @@ TEST(ToJson, RuntimeProfileToJsonTest) {
       EXPECT_EQ(15, itr["avg"].GetInt());
       EXPECT_EQ(2, itr["num_of_samples"].GetInt());
       EXPECT_EQ("TIME_NS", itr["unit"]);
-      EXPECT_TRUE(!itr.HasMember("kind"));
     }
   }
 }
diff --git a/be/src/util/runtime-profile.cc b/be/src/util/runtime-profile.cc
index 1ffc922..005b70a 100644
--- a/be/src/util/runtime-profile.cc
+++ b/be/src/util/runtime-profile.cc
@@ -229,7 +229,8 @@ RuntimeProfile* RuntimeProfile::CreateFromThrift(ObjectPool* pool,
 
   ++*idx;
   for (int i = 0; i < node.num_children; ++i) {
-    profile->AddChild(RuntimeProfile::CreateFromThrift(pool, nodes, idx));
+    bool indent = nodes[*idx].indent;
+    profile->AddChild(RuntimeProfile::CreateFromThrift(pool, nodes, idx), indent);
   }
   return profile;
 }
@@ -782,7 +783,8 @@ void RuntimeProfile::ToJsonCounters(Value* parent, Document* d,
 
       Value counter(kObjectType);
       iter->second->ToJson(*d, &counter);
-      counter.AddMember("counter_name", StringRef(child_counter.c_str()), allocator);
+      Value child_counter_json(child_counter.c_str(), child_counter.size(), allocator);
+      counter.AddMember("counter_name", child_counter_json, allocator);
 
       Value child_counters_json(kArrayType);
       RuntimeProfile::ToJsonCounters(&child_counters_json, d,
@@ -883,11 +885,9 @@ void RuntimeProfile::ToJsonHelper(Value* parent, Document* d) const{
       Value summary_stats_counters_json(kArrayType);
       for (const SummaryStatsCounterMap::value_type& v : summary_stats_map_) {
         Value summary_stats_counter(kObjectType);
-        Value summary_name(v.first.c_str(), allocator);
+        Value summary_name_json(v.first.c_str(), v.first.size(), allocator);
         v.second->ToJson(*d, &summary_stats_counter);
-        // Remove Kind here because it would be redundant information for users
-        summary_stats_counter.RemoveMember("kind");
-        summary_stats_counter.AddMember("counter_name", summary_name, allocator);
+        summary_stats_counter.AddMember("counter_name", summary_name_json, allocator);
         summary_stats_counters_json.PushBack(summary_stats_counter, allocator);
       }
       parent->AddMember(
@@ -1078,14 +1078,7 @@ void RuntimeProfile::PrettyPrint(ostream* s, const string& prefix) const {
   }
 }
 
-Status RuntimeProfile::SerializeToArchiveString(string* out) const {
-  stringstream ss;
-  RETURN_IF_ERROR(SerializeToArchiveString(&ss));
-  *out = ss.str();
-  return Status::OK();
-}
-
-Status RuntimeProfile::SerializeToArchiveString(stringstream* out) const {
+Status RuntimeProfile::Compress(vector<uint8_t>* out) const {
   Status status;
   TRuntimeProfileTree thrift_object;
   const_cast<RuntimeProfile*>(this)->ToThrift(&thrift_object);
@@ -1101,36 +1094,19 @@ Status RuntimeProfile::SerializeToArchiveString(stringstream* out) const {
   const auto close_compressor =
       MakeScopeExitTrigger([&compressor]() { compressor->Close(); });
 
-  vector<uint8_t> compressed_buffer;
   int64_t max_compressed_size = compressor->MaxOutputLen(serialized_buffer.size());
   DCHECK_GT(max_compressed_size, 0);
-  compressed_buffer.resize(max_compressed_size);
-  int64_t result_len = compressed_buffer.size();
-  uint8_t* compressed_buffer_ptr = compressed_buffer.data();
+  out->resize(max_compressed_size);
+  int64_t result_len = out->size();
+  uint8_t* compressed_buffer_ptr = out->data();
   RETURN_IF_ERROR(compressor->ProcessBlock(true, serialized_buffer.size(),
       serialized_buffer.data(), &result_len, &compressed_buffer_ptr));
-  compressed_buffer.resize(result_len);
-
-  Base64Encode(compressed_buffer, out);
-  return Status::OK();;
+  out->resize(result_len);
+  return Status::OK();
 }
 
-Status RuntimeProfile::DeserializeFromArchiveString(
-    const std::string& archive_str, TRuntimeProfileTree* out) {
-  int64_t decoded_max;
-  if (!Base64DecodeBufLen(archive_str.c_str(), archive_str.size(), &decoded_max)) {
-    return Status("Error in DeserializeFromArchiveString: Base64DecodeBufLen failed.");
-  }
-
-  vector<uint8_t> decoded_buffer;
-  decoded_buffer.resize(decoded_max);
-  int64_t decoded_len;
-  if (!Base64Decode(archive_str.c_str(), archive_str.size(), decoded_max,
-          reinterpret_cast<char*>(decoded_buffer.data()), &decoded_len)) {
-    return Status("Error in DeserializeFromArchiveString: Base64Decode failed.");
-  }
-  decoded_buffer.resize(decoded_len);
-
+Status RuntimeProfile::DecompressToThrift(
+    const vector<uint8_t>& compressed_profile, TRuntimeProfileTree* out) {
   scoped_ptr<Codec> decompressor;
   MemTracker mem_tracker;
   MemPool mem_pool(&mem_tracker);
@@ -1145,8 +1121,8 @@ Status RuntimeProfile::DeserializeFromArchiveString(
 
   int64_t result_len;
   uint8_t* decompressed_buffer;
-  RETURN_IF_ERROR(decompressor->ProcessBlock(
-      false, decoded_len, decoded_buffer.data(), &result_len, &decompressed_buffer));
+  RETURN_IF_ERROR(decompressor->ProcessBlock(false, compressed_profile.size(),
+      compressed_profile.data(), &result_len, &decompressed_buffer));
 
   uint32_t deserialized_len = static_cast<uint32_t>(result_len);
   RETURN_IF_ERROR(
@@ -1154,6 +1130,47 @@ Status RuntimeProfile::DeserializeFromArchiveString(
   return Status::OK();
 }
 
+Status RuntimeProfile::DecompressToProfile(
+    const vector<uint8_t>& compressed_profile, ObjectPool* pool, RuntimeProfile** out) {
+  TRuntimeProfileTree thrift_profile;
+  RETURN_IF_ERROR(
+      RuntimeProfile::DecompressToThrift(compressed_profile, &thrift_profile));
+  *out = RuntimeProfile::CreateFromThrift(pool, thrift_profile);
+  return Status::OK();
+}
+
+Status RuntimeProfile::SerializeToArchiveString(string* out) const {
+  stringstream ss;
+  RETURN_IF_ERROR(SerializeToArchiveString(&ss));
+  *out = ss.str();
+  return Status::OK();
+}
+
+Status RuntimeProfile::SerializeToArchiveString(stringstream* out) const {
+  vector<uint8_t> compressed_buffer;
+  RETURN_IF_ERROR(Compress(&compressed_buffer));
+  Base64Encode(compressed_buffer, out);
+  return Status::OK();
+}
+
+Status RuntimeProfile::DeserializeFromArchiveString(
+    const std::string& archive_str, TRuntimeProfileTree* out) {
+  int64_t decoded_max;
+  if (!Base64DecodeBufLen(archive_str.c_str(), archive_str.size(), &decoded_max)) {
+    return Status("Error in DeserializeFromArchiveString: Base64DecodeBufLen failed.");
+  }
+
+  vector<uint8_t> decoded_buffer;
+  decoded_buffer.resize(decoded_max);
+  int64_t decoded_len;
+  if (!Base64Decode(archive_str.c_str(), archive_str.size(), decoded_max,
+          reinterpret_cast<char*>(decoded_buffer.data()), &decoded_len)) {
+    return Status("Error in DeserializeFromArchiveString: Base64Decode failed.");
+  }
+  decoded_buffer.resize(decoded_len);
+  return DecompressToThrift(decoded_buffer, out);
+}
+
 void RuntimeProfile::SetTExecSummary(const TExecSummary& summary) {
   lock_guard<SpinLock> l(t_exec_summary_lock_);
   t_exec_summary_ = summary;
@@ -1662,16 +1679,15 @@ void RuntimeProfile::Counter::ToJson(Document& document, Value* val) const {
   DCHECK(unit_itr != _TUnit_VALUES_TO_NAMES.end());
   Value unit_json(unit_itr->second, document.GetAllocator());
   counter_json.AddMember("unit", unit_json, document.GetAllocator());
-  Value kind_json(CounterType().c_str(), document.GetAllocator());
-  counter_json.AddMember("kind", kind_json, document.GetAllocator());
   *val = counter_json;
 }
 
 void RuntimeProfile::TimeSeriesCounter::ToJson(Document& document, Value* val) {
   lock_guard<SpinLock> lock(lock_);
   Value counter_json(kObjectType);
-  counter_json.AddMember("counter_name",
-      StringRef(name_.c_str()), document.GetAllocator());
+
+  Value counter_name_json(name_.c_str(), name_.size(), document.GetAllocator());
+  counter_json.AddMember("counter_name", counter_name_json, document.GetAllocator());
   auto unit_itr = _TUnit_VALUES_TO_NAMES.find(unit_);
   DCHECK(unit_itr != _TUnit_VALUES_TO_NAMES.end());
   Value unit_json(unit_itr->second, document.GetAllocator());
@@ -1709,7 +1725,8 @@ void RuntimeProfile::EventSequence::ToJson(Document& document, Value* value) {
 
   for (const Event& ev: events_) {
     Value event_json(kObjectType);
-    event_json.AddMember("label", StringRef(ev.first.c_str()), document.GetAllocator());
+    Value label_json(ev.first.c_str(), ev.first.size(), document.GetAllocator());
+    event_json.AddMember("label", label_json, document.GetAllocator());
     event_json.AddMember("timestamp", ev.second, document.GetAllocator());
     events_json.PushBack(event_json, document.GetAllocator());
   }
diff --git a/be/src/util/runtime-profile.h b/be/src/util/runtime-profile.h
index 9b3abf8..f2943d5 100644
--- a/be/src/util/runtime-profile.h
+++ b/be/src/util/runtime-profile.h
@@ -121,11 +121,6 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
     /// counter_name, value, kind, unit
     virtual void ToJson(rapidjson::Document& document, rapidjson::Value* val) const;
 
-    ///  Return the name of the counter type
-    virtual string CounterType() const {
-      return "Counter";
-    }
-
     TUnit::type unit() const { return unit_; }
 
    protected:
@@ -326,6 +321,21 @@ class RuntimeProfile { // NOLINT: This struct is not packed, but there are not s
   void ToJsonHelper(rapidjson::Value* parent, rapidjson::Document* d) const;
   void ToJson(rapidjson::Document* d) const;
 
+  /// Serializes the runtime profile to a buffer.  This first serializes the
+  /// object using thrift compact binary format and then gzip compresses it.
+  /// This is not a lightweight operation and should not be in the hot path.
+  Status Compress(std::vector<uint8_t>* out) const;
+
+  /// Deserializes a compressed profile into a TRuntimeProfileTree. 'compressed_profile'
+  /// is expected to have been serialized by Compress().
+  static Status DecompressToThrift(
+      const std::vector<uint8_t>& compressed_profile, TRuntimeProfileTree* out);
+
+  /// Deserializes a compressed profile into a RuntimeProfile tree owned by 'pool'.
+  /// 'compressed_profile' is expected to have been serialized by Compress().
+  static Status DecompressToProfile(const std::vector<uint8_t>& compressed_profile,
+      ObjectPool* pool, RuntimeProfile** out);
+
   /// Serializes the runtime profile to a string.  This first serializes the
   /// object using thrift compact binary format, then gzip compresses it and
   /// finally encodes it as base64.  This is not a lightweight operation and