You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/15 06:45:44 UTC

[2/2] impala git commit: IMPALA-4953, IMPALA-6437: separate AC/scheduler from catalog topic updates

IMPALA-4953,IMPALA-6437: separate AC/scheduler from catalog topic updates

This adds a set of "prioritized" statestore topics that are small but
are important to deliver in a timely manner. These are delivered more
frequently by a separate thread pool to reduce the window for stale
admission control and scheduling information.

The contract between statestore and subscriber is changed so that the
statestore can send concurrent Update() RPCs for disjoint sets of
topics. This required changes to the subscriber implementation, which
assumed that only one Update RPC would arrive at a time.

It also changes the locking in the statestore so that the prioritized
update threads don't get stuck behind the catalog threads holding
'topic_lock_'. Specifically, it uses a reader-writer lock to protect
modification of the set of topics and a reader-writer lock per topic to
allow the topic data to be read by multiple threads concurrently.

Added metrics to monitor the per-topic update interval.

Testing:
Ran core tests.

Inspected metrics on Impala daemons, saw that membership and request
queue processing times had more samples recorded than the catalog
topic, reflecting the increased frequency.

Ran under thread sanitizer, made sure no data races were reported in
Statestore or StatestoreSubscriber.

Change-Id: Ifc49c2d0f2a5bfad822545616b8c62b4b95dc210
Reviewed-on: http://gerrit.cloudera.org:8080/9123
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/26309141
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/26309141
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/26309141

Branch: refs/heads/2.x
Commit: 26309141d066706b07ecfb3572b5bc87eebfc736
Parents: 09962ad
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jan 24 09:37:36 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 14 23:10:16 2018 +0000

----------------------------------------------------------------------
 be/src/scheduling/admission-controller.cc       |   8 +-
 be/src/scheduling/admission-controller.h        |   3 -
 be/src/scheduling/scheduler-test-util.cc        |  10 +-
 be/src/scheduling/scheduler.cc                  |   7 +-
 be/src/scheduling/scheduler.h                   |   2 -
 be/src/service/impala-server.cc                 |   6 +-
 be/src/statestore/statestore-subscriber.cc      | 219 ++++++---
 be/src/statestore/statestore-subscriber.h       | 128 ++---
 be/src/statestore/statestore.cc                 | 490 ++++++++++++-------
 be/src/statestore/statestore.h                  | 295 +++++++----
 common/thrift/metrics.json                      |  25 +-
 .../custom_cluster/test_admission_controller.py |  20 +-
 tests/statestore/test_statestore.py             | 109 +++--
 www/statestore_subscribers.tmpl                 |   2 +
 www/statestore_topics.tmpl                      |   2 +
 15 files changed, 843 insertions(+), 483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index f43af2c..640a6af 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -50,8 +50,6 @@ int64_t GetProcMemLimit() {
   return ExecEnv::GetInstance()->process_mem_tracker()->limit();
 }
 
-const string AdmissionController::IMPALA_REQUEST_QUEUE_TOPIC("impala-request-queue");
-
 // Delimiter used for topic keys of the form "<pool_name><delimiter><backend_id>".
 // "!" is used because the backend id contains a colon, but it should not contain "!".
 // When parsing the topic key we need to be careful to find the last instance in
@@ -243,7 +241,7 @@ Status AdmissionController::Init() {
       &AdmissionController::DequeueLoop, this, &dequeue_thread_));
   StatestoreSubscriber::UpdateCallback cb =
     bind<void>(mem_fn(&AdmissionController::UpdatePoolStats), this, _1, _2);
-  Status status = subscriber_->AddTopic(IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
+  Status status = subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
   if (!status.ok()) {
     status.AddDetail("AdmissionController failed to register request queue topic");
   }
@@ -632,7 +630,7 @@ void AdmissionController::UpdatePoolStats(
     AddPoolUpdates(subscriber_topic_updates);
 
     StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
-        incoming_topic_deltas.find(IMPALA_REQUEST_QUEUE_TOPIC);
+        incoming_topic_deltas.find(Statestore::IMPALA_REQUEST_QUEUE_TOPIC);
     if (topic != incoming_topic_deltas.end()) {
       const TTopicDelta& delta = topic->second;
       // Delta and non-delta updates are handled the same way, except for a full update
@@ -799,7 +797,7 @@ void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
   if (pools_for_updates_.empty()) return;
   topic_updates->push_back(TTopicDelta());
   TTopicDelta& topic_delta = topic_updates->back();
-  topic_delta.topic_name = IMPALA_REQUEST_QUEUE_TOPIC;
+  topic_delta.topic_name = Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
   for (const string& pool_name: pools_for_updates_) {
     DCHECK(pool_stats_.find(pool_name) != pool_stats_.end());
     PoolStats* stats = GetPoolStats(pool_name);

http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 2830bee..3341b1b 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -206,9 +206,6 @@ class AdmissionController {
   class PoolStats;
   friend class PoolStats;
 
-  /// Statestore topic name.
-  static const std::string IMPALA_REQUEST_QUEUE_TOPIC;
-
   /// Subscription manager used to handle admission control updates. This is not
   /// owned by this class.
   StatestoreSubscriber* subscriber_;

http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 05cfc42..7363cd3 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -467,7 +467,7 @@ Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result) {
 void SchedulerWrapper::AddBackend(const Host& host) {
   // Add to topic delta
   TTopicDelta delta;
-  delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
   AddHostToTopicDelta(host, &delta);
   SendTopicDelta(delta);
@@ -476,7 +476,7 @@ void SchedulerWrapper::AddBackend(const Host& host) {
 void SchedulerWrapper::RemoveBackend(const Host& host) {
   // Add deletion to topic delta
   TTopicDelta delta;
-  delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
   TTopicItem item;
   item.__set_deleted(true);
@@ -487,7 +487,7 @@ void SchedulerWrapper::RemoveBackend(const Host& host) {
 
 void SchedulerWrapper::SendFullMembershipMap() {
   TTopicDelta delta;
-  delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = false;
   for (const Host& host : plan_.cluster().hosts()) {
     if (host.be_port >= 0) AddHostToTopicDelta(host, &delta);
@@ -497,7 +497,7 @@ void SchedulerWrapper::SendFullMembershipMap() {
 
 void SchedulerWrapper::SendEmptyUpdate() {
   TTopicDelta delta;
-  delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
   SendTopicDelta(delta);
 }
@@ -547,7 +547,7 @@ void SchedulerWrapper::SendTopicDelta(const TTopicDelta& delta) {
   DCHECK(scheduler_ != nullptr);
   // Wrap in topic delta map.
   StatestoreSubscriber::TopicDeltaMap delta_map;
-  delta_map.emplace(Scheduler::IMPALA_MEMBERSHIP_TOPIC, delta);
+  delta_map.emplace(Statestore::IMPALA_MEMBERSHIP_TOPIC, delta);
 
   // Send to the scheduler.
   vector<TTopicDelta> dummy_result;

http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index e924f50..2ba1563 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -52,8 +52,6 @@ static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
 static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
 static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends");
 
-const string Scheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
-
 Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id,
     MetricGroup* metrics, Webserver* webserver, RequestPoolService* request_pool_service)
   : executors_config_(std::make_shared<const BackendConfig>()),
@@ -86,7 +84,8 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
   if (statestore_subscriber_ != nullptr) {
     StatestoreSubscriber::UpdateCallback cb =
         bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2);
-    Status status = statestore_subscriber_->AddTopic(IMPALA_MEMBERSHIP_TOPIC, true, cb);
+    Status status = statestore_subscriber_->AddTopic(
+        Statestore::IMPALA_MEMBERSHIP_TOPIC, true, cb);
     if (!status.ok()) {
       status.AddDetail("Scheduler failed to register membership topic");
       return status;
@@ -123,7 +122,7 @@ void Scheduler::UpdateMembership(
     vector<TTopicDelta>* subscriber_topic_updates) {
   // First look to see if the topic(s) we're interested in have an update
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
-      incoming_topic_deltas.find(IMPALA_MEMBERSHIP_TOPIC);
+      incoming_topic_deltas.find(Statestore::IMPALA_MEMBERSHIP_TOPIC);
 
   if (topic == incoming_topic_deltas.end()) return;
   const TTopicDelta& delta = topic->second;

http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 2fe90b8..b87b239 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -70,8 +70,6 @@ class SchedulerWrapper;
 ///           configuration.
 class Scheduler {
  public:
-  static const std::string IMPALA_MEMBERSHIP_TOPIC;
-
   /// List of server descriptors.
   typedef std::vector<TBackendDescriptor> BackendList;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index cff53e7..7cbbd16 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -362,7 +362,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
       this->MembershipCallback(state, topic_updates);
     };
     ABORT_IF_ERROR(
-        exec_env->subscriber()->AddTopic(Scheduler::IMPALA_MEMBERSHIP_TOPIC, true, cb));
+        exec_env->subscriber()->AddTopic(Statestore::IMPALA_MEMBERSHIP_TOPIC, true, cb));
 
     if (FLAGS_is_coordinator) {
       auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
@@ -1487,7 +1487,7 @@ void ImpalaServer::MembershipCallback(
   // TODO: Consider rate-limiting this. In the short term, best to have
   // statestore heartbeat less frequently.
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
-      incoming_topic_deltas.find(Scheduler::IMPALA_MEMBERSHIP_TOPIC);
+      incoming_topic_deltas.find(Statestore::IMPALA_MEMBERSHIP_TOPIC);
 
   if (topic != incoming_topic_deltas.end()) {
     const TTopicDelta& delta = topic->second;
@@ -1612,7 +1612,7 @@ void ImpalaServer::AddLocalBackendToStatestore(
   }
   subscriber_topic_updates->emplace_back(TTopicDelta());
   TTopicDelta& update = subscriber_topic_updates->back();
-  update.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  update.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   update.topic_entries.emplace_back(TTopicItem());
 
   TTopicItem& item = update.topic_entries.back();

http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index e58c177..a27d1ae 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -22,6 +22,7 @@
 
 #include <boost/algorithm/string/join.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/thread/lock_options.hpp>
 #include <boost/thread/shared_mutex.hpp>
 #include <gutil/strings/substitute.h>
 
@@ -39,6 +40,9 @@
 #include "common/names.h"
 
 using boost::posix_time::seconds;
+using boost::shared_lock;
+using boost::shared_mutex;
+using boost::try_to_lock;
 using namespace apache::thrift;
 using namespace apache::thrift::transport;
 using namespace strings;
@@ -65,6 +69,9 @@ const string STATESTORE_ID = "STATESTORE";
 // Template for metrics that measure the processing time for individual topics.
 const string CALLBACK_METRIC_PATTERN = "statestore-subscriber.topic-$0.processing-time-s";
 
+// Template for metrics that measure the interval between updates for individual topics.
+const string UPDATE_INTERVAL_METRIC_PATTERN = "statestore-subscriber.topic-$0.update-interval";
+
 // Duration, in ms, to sleep between attempts to reconnect to the
 // statestore after a failure.
 const int32_t SLEEP_INTERVAL_MS = 5000;
@@ -107,41 +114,42 @@ StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
       failure_detector_(new TimeoutFailureDetector(
           seconds(FLAGS_statestore_subscriber_timeout_seconds),
           seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
-      is_registered_(false),
       client_cache_(new StatestoreClientCache(FLAGS_statestore_subscriber_cnxn_attempts,
-          FLAGS_statestore_subscriber_cnxn_retry_interval_ms, 0, 0, "",
-          !FLAGS_ssl_client_ca_certificate.empty())),
-      metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")) {
+                FLAGS_statestore_subscriber_cnxn_retry_interval_ms, 0, 0, "",
+                !FLAGS_ssl_client_ca_certificate.empty())),
+      metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")),
+      is_registered_(false) {
   connected_to_statestore_metric_ =
       metrics_->AddProperty("statestore-subscriber.connected", false);
   last_recovery_duration_metric_ = metrics_->AddDoubleGauge(
       "statestore-subscriber.last-recovery-duration", 0.0);
   last_recovery_time_metric_ = metrics_->AddProperty<string>(
       "statestore-subscriber.last-recovery-time", "N/A");
-  topic_update_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
+  topic_update_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics_,
       "statestore-subscriber.topic-update-interval-time");
-  topic_update_duration_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
+  topic_update_duration_metric_ = StatsMetric<double>::CreateAndRegister(metrics_,
       "statestore-subscriber.topic-update-duration");
-  heartbeat_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
+  heartbeat_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics_,
       "statestore-subscriber.heartbeat-interval-time");
-
   registration_id_metric_ = metrics->AddProperty<string>(
       "statestore-subscriber.registration-id", "N/A");
-
   client_cache_->InitMetrics(metrics, "statestore-subscriber.statestore");
 }
 
 Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
     bool is_transient, const UpdateCallback& callback) {
-  lock_guard<mutex> l(lock_);
+  lock_guard<shared_mutex> exclusive_lock(lock_);
   if (is_registered_) return Status("Subscriber already started, can't add new topic");
-  Callbacks* cb = &(update_callbacks_[topic_id]);
-  cb->callbacks.push_back(callback);
-  if (cb->processing_time_metric == NULL) {
-    cb->processing_time_metric = StatsMetric<double>::CreateAndRegister(metrics_,
+  TopicRegistration& registration = topic_registrations_[topic_id];
+  registration.callbacks.push_back(callback);
+  if (registration.processing_time_metric == nullptr) {
+    registration.processing_time_metric = StatsMetric<double>::CreateAndRegister(metrics_,
         CALLBACK_METRIC_PATTERN, topic_id);
+    registration.update_interval_metric = StatsMetric<double>::CreateAndRegister(metrics_,
+        UPDATE_INTERVAL_METRIC_PATTERN, topic_id);
+    registration.update_interval_timer.Start();
   }
-  topic_registrations_[topic_id] = is_transient;
+  registration.is_transient = is_transient;
   return Status::OK();
 }
 
@@ -151,11 +159,10 @@ Status StatestoreSubscriber::Register() {
   RETURN_IF_ERROR(client_status);
 
   TRegisterSubscriberRequest request;
-  request.topic_registrations.reserve(update_callbacks_.size());
-  for (const UpdateCallbacks::value_type& topic: update_callbacks_) {
+  for (const auto& registration : topic_registrations_) {
     TTopicRegistration thrift_topic;
-    thrift_topic.topic_name = topic.first;
-    thrift_topic.is_transient = topic_registrations_[topic.first];
+    thrift_topic.topic_name = registration.first;
+    thrift_topic.is_transient = registration.second.is_transient;
     request.topic_registrations.push_back(thrift_topic);
   }
 
@@ -175,7 +182,6 @@ Status StatestoreSubscriber::Register() {
   } else {
     VLOG(1) << "No subscriber registration ID received from statestore";
   }
-  topic_update_interval_timer_.Start();
   heartbeat_interval_timer_.Start();
   return status;
 }
@@ -186,7 +192,7 @@ Status StatestoreSubscriber::Start() {
     // Take the lock to ensure that, if a topic-update is received during registration
     // (perhaps because Register() has succeeded, but we haven't finished setting up state
     // on the client side), UpdateState() will reject the message.
-    lock_guard<mutex> l(lock_);
+    lock_guard<shared_mutex> exclusive_lock(lock_);
     LOG(INFO) << "Starting statestore subscriber";
 
     // Backend must be started before registration
@@ -241,7 +247,7 @@ void StatestoreSubscriber::RecoveryModeChecker() {
     if (failure_detector_->GetPeerState(STATESTORE_ID) == FailureDetector::FAILED) {
       // When entering recovery mode, the class-wide lock_ is taken to
       // ensure mutual exclusion with any operations in flight.
-      lock_guard<mutex> l(lock_);
+      lock_guard<shared_mutex> exclusive_lock(lock_);
       MonotonicStopWatch recovery_timer;
       recovery_timer.Start();
       connected_to_statestore_metric_->SetValue(false);
@@ -313,74 +319,127 @@ void StatestoreSubscriber::Heartbeat(const RegistrationId& registration_id) {
 Status StatestoreSubscriber::UpdateState(const TopicDeltaMap& incoming_topic_deltas,
     const RegistrationId& registration_id, vector<TTopicDelta>* subscriber_topic_updates,
     bool* skipped) {
+  RETURN_IF_ERROR(CheckRegistrationId(registration_id));
+
+  // Put the updates into ascending order of topic name to match the lock acquisition
+  // order of TopicRegistration::update_lock.
+  vector<const TTopicDelta*> deltas_to_process;
+  for (auto& delta : incoming_topic_deltas) deltas_to_process.push_back(&delta.second);
+  sort(deltas_to_process.begin(), deltas_to_process.end(),
+      [](const TTopicDelta* left, const TTopicDelta* right) {
+        return left->topic_name < right->topic_name;
+      });
+  // Unique locks to hold the 'update_lock' for each entry in 'deltas_to_process'. Locks
+  // are held until we finish processing the update to prevent any races with concurrent
+  // updates for the same topic.
+  vector<unique_lock<mutex>> topic_update_locks(deltas_to_process.size());
+
   // We don't want to block here because this is an RPC, and delaying the return causes
   // the statestore to delay sending further messages. The only time that lock_ might be
-  // taken concurrently is if:
+  // taken exclusively is if the subscriber is recovering, and has the lock held during
+  // RecoveryModeChecker(). In this case we skip all topics and don't update any metrics.
   //
-  // a) another update is still being processed (i.e. is still in UpdateState()). This
-  // could happen only when the subscriber has re-registered, and the statestore is still
-  // sending an update for the previous registration. In this case, return OK but set
-  // *skipped = true to tell the statestore to retry this update in the future.
+  // UpdateState() may run concurrently with itself in two cases:
+  // a) disjoint sets of topics are being updated. In that case the updates can proceed
+  // concurrently.
+  // b) another update for the same topics is still being processed (i.e. is still in
+  // UpdateState()). This could happen only when the subscriber has re-registered, and
+  // the statestore is still sending an update for the previous registration. In this
+  // case, we notices that the per-topic 'update_lock' is held, skip processing all
+  // of the topic updates and set *skipped = true so that the statestore will retry this
+  // update in the future.
   //
-  // b) the subscriber is recovering, and has the lock held during
-  // RecoveryModeChecker(). Similarly, we set *skipped = true.
   // TODO: Consider returning an error in this case so that the statestore will eventually
   // stop sending updates even if re-registration fails.
-  try_mutex::scoped_try_lock l(lock_);
-  if (l) {
-    *skipped = false;
-    RETURN_IF_ERROR(CheckRegistrationId(registration_id));
-
-    // Only record updates received when not in recovery mode
-    topic_update_interval_metric_->Update(
-        topic_update_interval_timer_.Reset() / (1000.0 * 1000.0 * 1000.0));
-    MonotonicStopWatch sw;
-    sw.Start();
-
-    // Check the version ranges of all delta updates to ensure they can be applied
-    // to this subscriber. If any invalid ranges are found, request new update(s) with
-    // version ranges applicable to this subscriber.
-    bool found_unexpected_delta = false;
-    for (const TopicDeltaMap::value_type& delta: incoming_topic_deltas) {
-      TopicVersionMap::const_iterator itr = current_topic_versions_.find(delta.first);
-      if (itr != current_topic_versions_.end()) {
-        if (delta.second.is_delta && delta.second.from_version != itr->second) {
-          LOG(ERROR) << "Unexpected delta update to topic '" << delta.first << "' of "
-                     << "version range (" << delta.second.from_version << ":"
-                     << delta.second.to_version << "]. Expected delta start version: "
-                     << itr->second;
-
-          subscriber_topic_updates->push_back(TTopicDelta());
-          TTopicDelta& update = subscriber_topic_updates->back();
-          update.topic_name = delta.second.topic_name;
-          update.__set_from_version(itr->second);
-          found_unexpected_delta = true;
-        } else {
-          // Update the current topic version
-          current_topic_versions_[delta.first] = delta.second.to_version;
-        }
-      }
+  shared_lock<shared_mutex> l(lock_, try_to_lock);
+  if (!l.owns_lock()) {
+    *skipped = true;
+    return Status::OK();
+  }
+
+  // First, acquire all the topic locks and update the interval metrics
+  // Record the time we received the update before doing any processing to avoid including
+  // processing time in the interval metrics.
+  for (int i = 0; i < deltas_to_process.size(); ++i) {
+    const TTopicDelta& delta = *deltas_to_process[i];
+    auto it = topic_registrations_.find(delta.topic_name);
+    // Skip updates to unregistered topics.
+    if (it == topic_registrations_.end()) {
+      LOG(ERROR) << "Unexpected delta update for unregistered topic: "
+                 << delta.topic_name;
+      continue;
+    }
+    TopicRegistration& registration = it->second;
+    unique_lock<mutex> ul(registration.update_lock, try_to_lock);
+    if (!ul.owns_lock()) {
+      // Statestore sent out concurrent topic updates. Avoid blocking the RPC by skipping
+      // the topic.
+      LOG(ERROR) << "Could not acquire lock for topic " << delta.topic_name << ". "
+                 << "Skipping update.";
+      *skipped = true;
+      return Status::OK();
     }
+    double interval =
+        registration.update_interval_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0);
+    registration.update_interval_metric->Update(interval);
+    topic_update_interval_metric_->Update(interval);
 
-    // Skip calling the callbacks when an unexpected delta update is found.
-    if (!found_unexpected_delta) {
-      for (const UpdateCallbacks::value_type& callbacks: update_callbacks_) {
-        MonotonicStopWatch sw;
-        sw.Start();
-        for (const UpdateCallback& callback: callbacks.second.callbacks) {
-          // TODO: Consider filtering the topics to only send registered topics to
-          // callbacks
-          callback(incoming_topic_deltas, subscriber_topic_updates);
-        }
-        callbacks.second.processing_time_metric->Update(
-            sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
-      }
+    // Hold onto lock until we've finished processing the update.
+    topic_update_locks[i].swap(ul);
+  }
+
+  MonotonicStopWatch sw;
+  sw.Start();
+  // Second, do the actual processing of topic updates that we validated and acquired
+  // locks for above.
+  for (int i = 0; i < deltas_to_process.size(); ++i) {
+    if (!topic_update_locks[i].owns_lock()) continue;
+
+    const TTopicDelta& delta = *deltas_to_process[i];
+    auto it = topic_registrations_.find(delta.topic_name);
+    DCHECK(it != topic_registrations_.end());
+    TopicRegistration& registration = it->second;
+    if (delta.is_delta && registration.current_topic_version != -1
+      && delta.from_version != registration.current_topic_version) {
+      // Received a delta update for the wrong version. Log an error and send back the
+      // expected version to the statestore to request a new update with the correct
+      // version range.
+      LOG(ERROR) << "Unexpected delta update to topic '" << delta.topic_name << "' of "
+                 << "version range (" << delta.from_version << ":"
+                 << delta.to_version << "]. Expected delta start version: "
+                 << registration.current_topic_version;
+
+      subscriber_topic_updates->push_back(TTopicDelta());
+      TTopicDelta& update = subscriber_topic_updates->back();
+      update.topic_name = delta.topic_name;
+      update.__set_from_version(registration.current_topic_version);
+      continue;
     }
-    sw.Stop();
-    topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
-  } else {
-    *skipped = true;
+    // The topic version in the update is valid, process the update.
+    MonotonicStopWatch update_callback_sw;
+    update_callback_sw.Start();
+    for (const UpdateCallback& callback : registration.callbacks) {
+      callback(incoming_topic_deltas, subscriber_topic_updates);
+    }
+    update_callback_sw.Stop();
+    registration.current_topic_version = delta.to_version;
+    registration.processing_time_metric->Update(
+        sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
+  }
+
+  // Third and finally, reset the interval timers so they correctly measure the
+  // time between RPCs, excluding processing time.
+  for (int i = 0; i < deltas_to_process.size(); ++i) {
+    if (!topic_update_locks[i].owns_lock()) continue;
+
+    const TTopicDelta& delta = *deltas_to_process[i];
+    auto it = topic_registrations_.find(delta.topic_name);
+    DCHECK(it != topic_registrations_.end());
+    TopicRegistration& registration = it->second;
+    registration.update_interval_timer.Reset();
   }
+  sw.Stop();
+  topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/be/src/statestore/statestore-subscriber.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index e8b2204..f102cae 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -24,6 +24,7 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread/mutex.hpp>
+#include <boost/thread/shared_mutex.hpp>
 
 #include "statestore/statestore.h"
 #include "util/stopwatch.h"
@@ -84,9 +85,9 @@ class StatestoreSubscriber {
   /// Function called to update a service with new state. Called in a
   /// separate thread to the one in which it is registered.
   //
-  /// Every UpdateCallback is invoked every time that an update is
-  /// received from the statestore. Therefore the callback should not
-  /// assume that the TopicDeltaMap contains an entry for their
+  /// Every UpdateCallback is invoked every time that an update for the
+  /// topic is received from the statestore. Therefore the callback should
+  /// not assume that the TopicDeltaMap contains an entry for their
   /// particular topic of interest.
   //
   /// If a delta for a particular topic does not have the 'is_delta'
@@ -145,55 +146,7 @@ class StatestoreSubscriber {
   /// Thread in which RecoveryModeChecker runs.
   std::unique_ptr<Thread> recovery_mode_thread_;
 
-  /// Class-wide lock. Protects all subsequent members. Most private methods must
-  /// be called holding this lock; this is noted in the method comments.
-  boost::mutex lock_;
-
-  /// Set to true after Register(...) is successful, after which no
-  /// more topics may be subscribed to.
-  bool is_registered_;
-
-  /// Protects registration_id_. Must be taken after lock_ if both are to be taken
-  /// together.
-  boost::mutex registration_id_lock_;
-
-  /// Set during Register(), this is the unique ID of the current registration with the
-  /// statestore. If this subscriber must recover, or disconnects and then reconnects, the
-  /// registration_id_ will change after Register() is called again. This allows the
-  /// subscriber to reject communication from the statestore that pertains to a previous
-  /// registration.
-  RegistrationId registration_id_;
-
-  struct Callbacks {
-    /// Owned by the MetricGroup instance. Tracks how long callbacks took to process this
-    /// topic.
-    StatsMetric<double>* processing_time_metric;
-
-    /// List of callbacks to invoke for this topic.
-    std::vector<UpdateCallback> callbacks;
-  };
-
-  /// Mapping of topic ids to their associated callbacks. Because this mapping
-  /// stores a pointer to an UpdateCallback, memory errors will occur if an UpdateCallback
-  /// is deleted before being unregistered. The UpdateCallback destructor checks for
-  /// such problems, so that we will have an assertion failure rather than a memory error.
-  typedef boost::unordered_map<Statestore::TopicId, Callbacks> UpdateCallbacks;
-
-  /// Callback for all services that have registered for updates (indexed by the associated
-  /// SubscriptionId), and associated lock.
-  UpdateCallbacks update_callbacks_;
-
-  /// One entry for every topic subscribed to. The value is whether this subscriber
-  /// considers this topic to be 'transient', that is any updates it makes will be deleted
-  /// upon failure or disconnection.
-  std::map<Statestore::TopicId, bool> topic_registrations_;
-
-  /// Mapping of TopicId to the last version of the topic this subscriber successfully
-  /// processed.
-  typedef boost::unordered_map<Statestore::TopicId, int64_t> TopicVersionMap;
-  TopicVersionMap current_topic_versions_;
-
-  /// statestore client cache - only one client is ever used.
+  /// statestore client cache - only one client is ever used. Initialized in constructor.
   boost::scoped_ptr<StatestoreClientCache> client_cache_;
 
   /// MetricGroup instance that all metrics are registered in. Not owned by this class.
@@ -208,25 +161,82 @@ class StatestoreSubscriber {
   /// When the last recovery happened.
   StringProperty* last_recovery_time_metric_;
 
-  /// Accumulated statistics on the frequency of topic-update messages
+  /// Accumulated statistics on the frequency of topic-update messages, including samples
+  /// from all topics.
   StatsMetric<double>* topic_update_interval_metric_;
 
-  /// Tracks the time between topic-update mesages
-  MonotonicStopWatch topic_update_interval_timer_;
-
   /// Accumulated statistics on the time taken to process each topic-update message from
   /// the statestore (that is, to call all callbacks)
   StatsMetric<double>* topic_update_duration_metric_;
 
-  /// Tracks the time between heartbeat mesages
-  MonotonicStopWatch heartbeat_interval_timer_;
-
   /// Accumulated statistics on the frequency of heartbeat messages
   StatsMetric<double>* heartbeat_interval_metric_;
 
+  /// Tracks the time between heartbeat messages. Only updated by Heartbeat(), which
+  /// should not run concurrently with itself.
+  MonotonicStopWatch heartbeat_interval_timer_;
+
   /// Current registration ID, in string form.
   StringProperty* registration_id_metric_;
 
+  /// Object-wide lock that protects the below members. Must be held exclusively when
+  /// modifying the members, except when modifying TopicRegistrations - see
+  /// TopicRegistration::update_lock for details of locking there. Held in shared mode
+  /// when processing topic updates to prevent concurrent updates to other state. Most
+  /// private methods must be called holding this lock; this is noted in the method
+  /// comments.
+  boost::shared_mutex lock_;
+
+  /// Set to true after Register(...) is successful, after which no
+  /// more topics may be subscribed to.
+  bool is_registered_;
+
+  /// Protects registration_id_. Must be taken after lock_ if both are to be taken
+  /// together.
+  boost::mutex registration_id_lock_;
+
+  /// Set during Register(), this is the unique ID of the current registration with the
+  /// statestore. If this subscriber must recover, or disconnects and then reconnects, the
+  /// registration_id_ will change after Register() is called again. This allows the
+  /// subscriber to reject communication from the statestore that pertains to a previous
+  /// registration.
+  RegistrationId registration_id_;
+
+  struct TopicRegistration {
+    /// Held when processing a topic update. 'StatestoreSubscriber::lock_' must be held in
+    /// shared mode before acquiring this lock. If taking multiple update locks, they must
+    /// be acquired in ascending order of topic name.
+    boost::mutex update_lock;
+
+    /// Whether the subscriber considers this topic to be "transient", that is any updates
+    /// it makes will be deleted upon failure or disconnection.
+    bool is_transient = false;
+
+    /// The last version of the topic this subscriber processed.
+    /// -1 if no updates have been processed yet.
+    int64_t current_topic_version = -1;
+
+    /// Owned by the MetricGroup instance. Tracks how long callbacks took to process this
+    /// topic.
+    StatsMetric<double>* processing_time_metric = nullptr;
+
+    /// Tracks the time between topic-update messages to update 'update_interval_metric'.
+    MonotonicStopWatch update_interval_timer;
+
+    /// Owned by the MetricGroup instances. Tracks the time between the end of the last
+    /// update RPC for this topic and the start of the next.
+    StatsMetric<double>* update_interval_metric = nullptr;
+
+    /// Callback for all services that have registered for updates.
+    std::vector<UpdateCallback> callbacks;
+  };
+
+  /// One entry for every topic subscribed to. 'lock_' must be held exclusively to add or
+  /// remove entries from the map or held as a shared lock to lookup entries in the map.
+  /// Modifications to the contents of each TopicRegistration is protected by
+  /// TopicRegistration::update_lock.
+  boost::unordered_map<Statestore::TopicId, TopicRegistration> topic_registrations_;
+
   /// Subscriber thrift implementation, needs to access UpdateState
   friend class StatestoreSubscriberThriftIf;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/26309141/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 8f4ddbf..1072dee 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -17,6 +17,10 @@
 
 #include "statestore/statestore.h"
 
+#include <algorithm>
+#include <tuple>
+#include <utility>
+
 #include <boost/lexical_cast.hpp>
 #include <boost/thread.hpp>
 #include <thrift/Thrift.h>
@@ -36,6 +40,12 @@
 
 #include "common/names.h"
 
+using boost::shared_lock;
+using boost::shared_mutex;
+using boost::upgrade_lock;
+using boost::upgrade_to_unique_lock;
+using std::forward_as_tuple;
+using std::piecewise_construct;
 using namespace apache::thrift;
 using namespace impala;
 using namespace rapidjson;
@@ -50,6 +60,15 @@ DEFINE_int32(statestore_num_update_threads, 10, "(Advanced) Number of threads us
 DEFINE_int32(statestore_update_frequency_ms, 2000, "(Advanced) Frequency (in ms) with"
     " which the statestore sends topic updates to subscribers.");
 
+// Priority updates are sent out much more frequently. They are assumed to be small
+// amounts of data that take a small amount of time to process. Assuming each update
+// takes < 1ms to process, sending out an update every 100ms will consume less than
+// 1% of a CPU on each subscriber.
+DEFINE_int32(statestore_num_priority_update_threads, 10, "(Advanced) Number of threads "
+    "used to send prioritized topic updates in parallel to all registered subscribers.");
+DEFINE_int32(statestore_priority_update_frequency_ms, 100, "(Advanced) Frequency (in ms) "
+    "with which the statestore sends prioritized topic updates to subscribers.");
+
 DEFINE_int32(statestore_num_heartbeat_threads, 10, "(Advanced) Number of threads used to "
     " send heartbeats in parallel to all registered subscribers.");
 DEFINE_int32(statestore_heartbeat_frequency_ms, 1000, "(Advanced) Frequency (in ms) with"
@@ -87,18 +106,23 @@ const string STATESTORE_TOTAL_KEY_SIZE_BYTES = "statestore.total-key-size-bytes"
 const string STATESTORE_TOTAL_VALUE_SIZE_BYTES = "statestore.total-value-size-bytes";
 const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES = "statestore.total-topic-size-bytes";
 const string STATESTORE_UPDATE_DURATION = "statestore.topic-update-durations";
+const string STATESTORE_PRIORITY_UPDATE_DURATION =
+    "statestore.priority-topic-update-durations";
 const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
 
 // Initial version for each Topic registered by a Subscriber. Generally, the Topic will
 // have a Version that is the MAX() of all entries in the Topic, but this initial
 // value needs to be less than TopicEntry::TOPIC_ENTRY_INITIAL_VERSION to distinguish
 // between the case where a Topic is empty and the case where the Topic only contains
-// an item with the initial version.
+// an entry with the initial version.
 const Statestore::TopicEntry::Version Statestore::Subscriber::TOPIC_INITIAL_VERSION = 0;
 
 // Updates or heartbeats that miss their deadline by this much are logged.
 const uint32_t DEADLINE_MISS_THRESHOLD_MS = 2000;
 
+const string Statestore::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
+const string Statestore::IMPALA_REQUEST_QUEUE_TOPIC("impala-request-queue");
+
 typedef ClientConnection<StatestoreSubscriberClientWrapper> StatestoreSubscriberConn;
 
 class StatestoreThriftIf : public StatestoreServiceIf {
@@ -127,46 +151,55 @@ void Statestore::TopicEntry::SetValue(const Statestore::TopicEntry::Value& bytes
   version_ = version;
 }
 
-Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
-    const Statestore::TopicEntry::Value& bytes, bool is_deleted) {
-  TopicEntryMap::iterator entry_it = entries_.find(key);
-  int64_t key_size_delta = 0;
-  int64_t value_size_delta = 0;
-  if (entry_it == entries_.end()) {
-    entry_it = entries_.insert(make_pair(key, TopicEntry())).first;
-    key_size_delta += key.size();
-  } else {
-    // Delete the old item from the version history. There is no need to search the
-    // version_history because there should only be at most a single item in the history
-    // at any given time
-    topic_update_log_.erase(entry_it->second.version());
-    value_size_delta -= entry_it->second.value().size();
+vector<Statestore::TopicEntry::Version> Statestore::Topic::Put(
+    const std::vector<TTopicItem>& entries) {
+  vector<Statestore::TopicEntry::Version> versions;
+  versions.reserve(entries.size());
+
+  // Acquire exclusive lock - we are modifying the topic.
+  lock_guard<shared_mutex> write_lock(lock_);
+  for (const TTopicItem& entry: entries) {
+    TopicEntryMap::iterator entry_it = entries_.find(entry.key);
+    int64_t key_size_delta = 0;
+    int64_t value_size_delta = 0;
+    if (entry_it == entries_.end()) {
+      entry_it = entries_.emplace(entry.key, TopicEntry()).first;
+      key_size_delta += entry.key.size();
+    } else {
+      // Delete the old entry from the version history. There is no need to search the
+      // version_history because there should only be at most a single entry in the
+      // history at any given time.
+      topic_update_log_.erase(entry_it->second.version());
+      value_size_delta -= entry_it->second.value().size();
+    }
+    value_size_delta += entry.value.size();
+
+    entry_it->second.SetValue(entry.value, ++last_version_);
+    entry_it->second.SetDeleted(entry.deleted);
+    topic_update_log_.emplace(entry_it->second.version(), entry.key);
+
+    total_key_size_bytes_ += key_size_delta;
+    total_value_size_bytes_ += value_size_delta;
+    DCHECK_GE(total_key_size_bytes_, static_cast<int64_t>(0));
+    DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
+    key_size_metric_->Increment(key_size_delta);
+    value_size_metric_->Increment(value_size_delta);
+    topic_size_metric_->Increment(key_size_delta + value_size_delta);
+    versions.push_back(entry_it->second.version());
   }
-  value_size_delta += bytes.size();
-
-  entry_it->second.SetValue(bytes, ++last_version_);
-  entry_it->second.SetDeleted(is_deleted);
-  topic_update_log_.insert(make_pair(entry_it->second.version(), key));
-
-  total_key_size_bytes_ += key_size_delta;
-  total_value_size_bytes_ += value_size_delta;
-  DCHECK_GE(total_key_size_bytes_, static_cast<int64_t>(0));
-  DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
-  key_size_metric_->Increment(key_size_delta);
-  value_size_metric_->Increment(value_size_delta);
-  topic_size_metric_->Increment(key_size_delta + value_size_delta);
-
-  return entry_it->second.version();
+  return versions;
 }
 
 void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
     const Statestore::TopicEntryKey& key) {
+  // Acquire exclusive lock - we are modifying the topic.
+  lock_guard<shared_mutex> write_lock(lock_);
   TopicEntryMap::iterator entry_it = entries_.find(key);
   if (entry_it != entries_.end() && entry_it->second.version() == version) {
     // Add a new entry with the the version history for this deletion and remove the old
     // entry
     topic_update_log_.erase(version);
-    topic_update_log_.insert(make_pair(++last_version_, key));
+    topic_update_log_.emplace(++last_version_, key);
     value_size_metric_->Increment(entry_it->second.value().size());
     topic_size_metric_->Increment(entry_it->second.value().size());
     entry_it->second.SetDeleted(true);
@@ -174,6 +207,79 @@ void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
   }
 }
 
+void Statestore::Topic::BuildDelta(const SubscriberId& subscriber_id,
+    TopicEntry::Version last_processed_version, TTopicDelta* delta) {
+  // If the subscriber version is > 0, send this update as a delta. Otherwise, this is
+  // a new subscriber so send them a non-delta update that includes all entries in the
+  // topic.
+  delta->is_delta = last_processed_version > Subscriber::TOPIC_INITIAL_VERSION;
+  delta->__set_from_version(last_processed_version);
+  {
+    // Acquire shared lock - we are not modifying the topic.
+    shared_lock<shared_mutex> read_lock(lock_);
+    TopicUpdateLog::const_iterator next_update =
+        topic_update_log_.upper_bound(last_processed_version);
+
+    uint64_t topic_size = 0;
+    for (; next_update != topic_update_log_.end(); ++next_update) {
+      TopicEntryMap::const_iterator itr = entries_.find(next_update->second);
+      DCHECK(itr != entries_.end());
+      const TopicEntry& topic_entry = itr->second;
+      // Don't send deleted entries for non-delta updates.
+      if (!delta->is_delta && topic_entry.is_deleted()) {
+        continue;
+      }
+      delta->topic_entries.push_back(TTopicItem());
+      TTopicItem& delta_entry = delta->topic_entries.back();
+      delta_entry.key = itr->first;
+      delta_entry.value = topic_entry.value();
+      delta_entry.deleted = topic_entry.is_deleted();
+      topic_size += delta_entry.key.size() + delta_entry.value.size();
+    }
+
+    if (!delta->is_delta &&
+        last_version_ > Subscriber::TOPIC_INITIAL_VERSION) {
+      VLOG_QUERY << "Preparing initial " << delta->topic_name
+                 << " topic update for " << subscriber_id << ". Size = "
+                 << PrettyPrinter::Print(topic_size, TUnit::BYTES);
+    }
+
+    if (topic_update_log_.size() > 0) {
+      // The largest version for this topic will be the last entry in the version history
+      // map.
+      delta->__set_to_version(topic_update_log_.rbegin()->first);
+    } else {
+      // There are no updates in the version history
+      delta->__set_to_version(Subscriber::TOPIC_INITIAL_VERSION);
+    }
+  }
+}
+void Statestore::Topic::ToJson(Document* document, Value* topic_json) {
+  // Acquire shared lock - we are not modifying the topic.
+  shared_lock<shared_mutex> read_lock(lock_);
+  Value topic_id(topic_id_.c_str(), document->GetAllocator());
+  topic_json->AddMember("topic_id", topic_id, document->GetAllocator());
+  topic_json->AddMember("num_entries",
+      static_cast<uint64_t>(entries_.size()),
+      document->GetAllocator());
+  topic_json->AddMember("version", last_version_, document->GetAllocator());
+
+  int64_t key_size = total_key_size_bytes_;
+  int64_t value_size = total_value_size_bytes_;
+  Value key_size_json(PrettyPrinter::Print(key_size, TUnit::BYTES).c_str(),
+      document->GetAllocator());
+  topic_json->AddMember("key_size", key_size_json, document->GetAllocator());
+  Value value_size_json(PrettyPrinter::Print(value_size, TUnit::BYTES).c_str(),
+      document->GetAllocator());
+  topic_json->AddMember("value_size", value_size_json, document->GetAllocator());
+  Value total_size_json(
+      PrettyPrinter::Print(key_size + value_size, TUnit::BYTES).c_str(),
+      document->GetAllocator());
+  topic_json->AddMember("total_size", total_size_json, document->GetAllocator());
+  topic_json->AddMember("prioritized", IsPrioritizedTopic(topic_id_),
+      document->GetAllocator());
+}
+
 Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
     const RegistrationId& registration_id, const TNetworkAddress& network_address,
     const vector<TTopicRegistration>& subscribed_topics)
@@ -181,47 +287,94 @@ Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
       registration_id_(registration_id),
       network_address_(network_address) {
   for (const TTopicRegistration& topic: subscribed_topics) {
-    TopicState topic_state;
-    topic_state.is_transient = topic.is_transient;
-    topic_state.last_version = TOPIC_INITIAL_VERSION;
-    subscribed_topics_[topic.topic_name] = topic_state;
+    GetTopicsMapForId(topic.topic_name)->emplace(piecewise_construct,
+        forward_as_tuple(topic.topic_name), forward_as_tuple(topic.is_transient));
   }
 }
 
-void Statestore::Subscriber::AddTransientUpdate(const TopicId& topic_id,
-    const TopicEntryKey& topic_key, TopicEntry::Version version) {
+bool Statestore::Subscriber::AddTransientEntries(const TopicId& topic_id,
+    const vector<TTopicItem>& entries,
+    const vector<TopicEntry::Version>& entry_versions) {
+  lock_guard<mutex> l(transient_entry_lock_);
+  DCHECK_EQ(entries.size(), entry_versions.size());
   // Only record the update if the topic is transient
-  const Topics::const_iterator topic_it = subscribed_topics_.find(topic_id);
-  DCHECK(topic_it != subscribed_topics_.end());
-  if (topic_it->second.is_transient == true) {
-    transient_entries_[make_pair(topic_id, topic_key)] = version;
+  Topics* subscribed_topics = GetTopicsMapForId(topic_id);
+  Topics::iterator topic_it = subscribed_topics->find(topic_id);
+  DCHECK(topic_it != subscribed_topics->end());
+  if (topic_it->second.is_transient) {
+    if (unregistered_) return false;
+    for (int i = 0; i < entries.size(); ++i) {
+      topic_it->second.transient_entries_[entries[i].key] = entry_versions[i];
+    }
   }
+  return true;
+}
+
+void Statestore::Subscriber::DeleteAllTransientEntries(TopicMap* global_topics) {
+  lock_guard<mutex> l(transient_entry_lock_);
+  for (const Topics* subscribed_topics :
+      {&priority_subscribed_topics_, &non_priority_subscribed_topics_}) {
+    for (const auto& topic : *subscribed_topics) {
+      auto global_topic_it = global_topics->find(topic.first);
+      DCHECK(global_topic_it != global_topics->end());
+      for (auto& transient_entry : topic.second.transient_entries_) {
+        global_topic_it->second.DeleteIfVersionsMatch(transient_entry.second,
+            transient_entry.first);
+      }
+    }
+  }
+  unregistered_ = true;
+}
+
+int64_t Statestore::Subscriber::NumTransientEntries() {
+  lock_guard<mutex> l(transient_entry_lock_);
+  int64_t num_entries = 0;
+  for (const Topics* subscribed_topics :
+      {&priority_subscribed_topics_, &non_priority_subscribed_topics_}) {
+    for (const auto& topic : *subscribed_topics) {
+      num_entries += topic.second.transient_entries_.size();
+    }
+  }
+  return num_entries;
 }
 
 Statestore::TopicEntry::Version Statestore::Subscriber::LastTopicVersionProcessed(
     const TopicId& topic_id) const {
-  Topics::const_iterator itr = subscribed_topics_.find(topic_id);
-  return itr == subscribed_topics_.end() ?
-      TOPIC_INITIAL_VERSION : itr->second.last_version;
+  const Topics& subscribed_topics = GetTopicsMapForId(topic_id);
+  Topics::const_iterator itr = subscribed_topics.find(topic_id);
+  return itr == subscribed_topics.end() ? TOPIC_INITIAL_VERSION
+                                        : itr->second.last_version.Load();
 }
 
 void Statestore::Subscriber::SetLastTopicVersionProcessed(const TopicId& topic_id,
     TopicEntry::Version version) {
-  subscribed_topics_[topic_id].last_version = version;
+  // Safe to call concurrently for different topics because 'subscribed_topics' is not
+  // modified.
+  Topics* subscribed_topics = GetTopicsMapForId(topic_id);
+  Topics::iterator topic_it = subscribed_topics->find(topic_id);
+  DCHECK(topic_it != subscribed_topics->end());
+  topic_it->second.last_version.Store(version);
 }
 
 Statestore::Statestore(MetricGroup* metrics)
-  : exit_flag_(false),
-    subscriber_topic_update_threadpool_("statestore-update",
+  : subscriber_topic_update_threadpool_("statestore-update",
         "subscriber-update-worker",
         FLAGS_statestore_num_update_threads,
         FLAGS_statestore_max_subscribers,
-        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, false, _1, _2)),
+        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this,
+          UpdateKind::TOPIC_UPDATE, _1, _2)),
+    subscriber_priority_topic_update_threadpool_("statestore-priority-update",
+        "subscriber-priority-update-worker",
+        FLAGS_statestore_num_priority_update_threads,
+        FLAGS_statestore_max_subscribers,
+        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this,
+          UpdateKind::PRIORITY_TOPIC_UPDATE, _1, _2)),
     subscriber_heartbeat_threadpool_("statestore-heartbeat",
         "subscriber-heartbeat-worker",
         FLAGS_statestore_num_heartbeat_threads,
         FLAGS_statestore_max_subscribers,
-        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, true, _1, _2)),
+        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this,
+          UpdateKind::HEARTBEAT, _1, _2)),
     update_state_client_cache_(new StatestoreSubscriberClientCache(1, 0,
         FLAGS_statestore_update_tcp_timeout_seconds * 1000,
         FLAGS_statestore_update_tcp_timeout_seconds * 1000, "",
@@ -245,6 +398,8 @@ Statestore::Statestore(MetricGroup* metrics)
 
   topic_update_duration_metric_ =
       StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_UPDATE_DURATION);
+  priority_topic_update_duration_metric_ = StatsMetric<double>::CreateAndRegister(
+      metrics, STATESTORE_PRIORITY_UPDATE_DURATION);
   heartbeat_duration_metric_ =
       StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_HEARTBEAT_DURATION);
 
@@ -254,6 +409,7 @@ Statestore::Statestore(MetricGroup* metrics)
 
 Status Statestore::Init() {
   RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init());
+  RETURN_IF_ERROR(subscriber_priority_topic_update_threadpool_.Init());
   RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init());
   return Status::OK();
 }
@@ -275,43 +431,21 @@ void Statestore::RegisterWebpages(Webserver* webserver) {
 void Statestore::TopicsHandler(const Webserver::ArgumentMap& args,
     Document* document) {
   lock_guard<mutex> l(subscribers_lock_);
-  lock_guard<mutex> t(topic_lock_);
+  shared_lock<shared_mutex> t(topics_map_lock_);
 
   Value topics(kArrayType);
 
-  for (const TopicMap::value_type& topic: topics_) {
+  for (TopicMap::value_type& topic: topics_) {
     Value topic_json(kObjectType);
-
-    Value topic_id(topic.second.id().c_str(), document->GetAllocator());
-    topic_json.AddMember("topic_id", topic_id, document->GetAllocator());
-    topic_json.AddMember("num_entries",
-        static_cast<uint64_t>(topic.second.entries().size()),
-        document->GetAllocator());
-    topic_json.AddMember(
-        "version", topic.second.last_version(), document->GetAllocator());
-
+    topic.second.ToJson(document, &topic_json);
     SubscriberId oldest_subscriber_id;
     TopicEntry::Version oldest_subscriber_version =
         GetMinSubscriberTopicVersion(topic.first, &oldest_subscriber_id);
-
     topic_json.AddMember("oldest_version", oldest_subscriber_version,
         document->GetAllocator());
     Value oldest_id(oldest_subscriber_id.c_str(), document->GetAllocator());
     topic_json.AddMember("oldest_id", oldest_id, document->GetAllocator());
-
-    int64_t key_size = topic.second.total_key_size_bytes();
-    int64_t value_size = topic.second.total_value_size_bytes();
-    Value key_size_json(PrettyPrinter::Print(key_size, TUnit::BYTES).c_str(),
-        document->GetAllocator());
-    topic_json.AddMember("key_size", key_size_json, document->GetAllocator());
-    Value value_size_json(PrettyPrinter::Print(value_size, TUnit::BYTES).c_str(),
-        document->GetAllocator());
-    topic_json.AddMember("value_size", value_size_json, document->GetAllocator());
-    Value total_size_json(
-        PrettyPrinter::Print(key_size + value_size, TUnit::BYTES).c_str(),
-        document->GetAllocator());
-    topic_json.AddMember("total_size", total_size_json, document->GetAllocator());
-    topics.PushBack(topic_json, document->GetAllocator());
+        topics.PushBack(topic_json, document->GetAllocator());
   }
   document->AddMember("topics", topics, document->GetAllocator());
 }
@@ -330,11 +464,18 @@ void Statestore::SubscribersHandler(const Webserver::ArgumentMap& args,
         document->GetAllocator());
     sub_json.AddMember("address", address, document->GetAllocator());
 
+    int64_t num_priority_topics =
+        subscriber.second->priority_subscribed_topics().size();
+    int64_t num_non_priority_topics =
+        subscriber.second->non_priority_subscribed_topics().size();
     sub_json.AddMember("num_topics",
-        static_cast<uint64_t>(subscriber.second->subscribed_topics().size()),
+        static_cast<uint64_t>(num_priority_topics + num_non_priority_topics),
+        document->GetAllocator());
+    sub_json.AddMember("num_priority_topics",
+        static_cast<uint64_t>(num_priority_topics),
         document->GetAllocator());
     sub_json.AddMember("num_transient",
-        static_cast<uint64_t>(subscriber.second->transient_entries().size()),
+        static_cast<uint64_t>(subscriber.second->NumTransientEntries()),
         document->GetAllocator());
 
     Value registration_id(PrintId(subscriber.second->registration_id()).c_str(),
@@ -378,14 +519,20 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
   // Create any new topics first, so that when the subscriber is first sent a topic update
   // by the worker threads its topics are guaranteed to exist.
   {
-    lock_guard<mutex> l(topic_lock_);
+    // Start with a shared read lock when checking the map. In the common case the topic
+    // will already exist, so we don't need to immediately get the exclusive lock and
+    // block other threads.
+    upgrade_lock<shared_mutex> topic_read_lock(topics_map_lock_);
     for (const TTopicRegistration& topic: topic_registrations) {
       TopicMap::iterator topic_it = topics_.find(topic.topic_name);
       if (topic_it == topics_.end()) {
+        // Upgrade to an exclusive lock when modifying the map.
+        upgrade_to_unique_lock<shared_mutex> topic_write_lock(topic_read_lock);
         LOG(INFO) << "Creating new topic: ''" << topic.topic_name
                   << "' on behalf of subscriber: '" << subscriber_id;
-        topics_.insert(make_pair(topic.topic_name, Topic(topic.topic_name,
-            key_size_metric_, value_size_metric_, topic_size_metric_)));
+        topics_.emplace(piecewise_construct, forward_as_tuple(topic.topic_name),
+            forward_as_tuple(topic.topic_name, key_size_metric_, value_size_metric_,
+            topic_size_metric_));
       }
     }
   }
@@ -400,7 +547,7 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
     UUIDToTUniqueId(subscriber_uuid_generator_(), registration_id);
     shared_ptr<Subscriber> current_registration(
         new Subscriber(subscriber_id, *registration_id, location, topic_registrations));
-    subscribers_.insert(make_pair(subscriber_id, current_registration));
+    subscribers_.emplace(subscriber_id, current_registration);
     failure_detector_->UpdateHeartbeat(
         PrintId(current_registration->registration_id()), true);
     num_subscribers_metric_->SetValue(subscribers_.size());
@@ -409,6 +556,7 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
     // Add the subscriber to the update queue, with an immediate schedule.
     ScheduledSubscriberUpdate update(0, subscriber_id, *registration_id);
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_));
+    RETURN_IF_ERROR(OfferUpdate(update, &subscriber_priority_topic_update_threadpool_));
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_));
   }
 
@@ -428,7 +576,8 @@ bool Statestore::FindSubscriber(const SubscriberId& subscriber_id,
   return true;
 }
 
-Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped) {
+Status Statestore::SendTopicUpdate(Subscriber* subscriber, UpdateKind update_kind,
+    bool* update_skipped) {
   // Time any successful RPCs (i.e. those for which UpdateState() completed, even though
   // it may have returned an error.)
   MonotonicStopWatch sw;
@@ -436,7 +585,12 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
   // First thing: make a list of updates to send
   TUpdateStateRequest update_state_request;
-  GatherTopicUpdates(*subscriber, &update_state_request);
+  GatherTopicUpdates(*subscriber, update_kind, &update_state_request);
+  // 'subscriber' may not be subscribed to any updates of 'update_kind'.
+  if (update_state_request.topic_deltas.empty()) {
+    *update_skipped = false;
+    return Status::OK();
+  }
 
   // Set the expected registration ID, so that the subscriber can reject this update if
   // they have moved on to a new registration instance.
@@ -452,9 +606,12 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
   RETURN_IF_ERROR(client.DoRpc(
       &StatestoreSubscriberClientWrapper::UpdateState, update_state_request, &response));
 
+  StatsMetric<double>* update_duration_metric =
+      update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE ?
+      priority_topic_update_duration_metric_ : topic_update_duration_metric_;
   status = Status(response.status);
   if (!status.ok()) {
-    topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
+    update_duration_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
     return status;
   }
 
@@ -463,7 +620,7 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
     // The subscriber skipped processing this update. We don't consider this a failure
     // - subscribers can decide what they do with any update - so, return OK and set
     // update_skipped so the caller can compensate.
-    topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
+    update_duration_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
     return Status::OK();
   }
 
@@ -476,7 +633,7 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
   // Thirdly: perform any / all updates returned by the subscriber
   {
-    lock_guard<mutex> l(topic_lock_);
+    shared_lock<shared_mutex> l(topics_map_lock_);
     for (const TTopicDelta& update: response.topic_updates) {
       TopicMap::iterator topic_it = topics_.find(update.topic_name);
       if (topic_it == topics_.end()) {
@@ -498,80 +655,49 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
         subscriber->SetLastTopicVersionProcessed(topic_it->first, update.from_version);
       }
 
-      Topic* topic = &topic_it->second;
-      for (const TTopicItem& item: update.topic_entries) {
-        subscriber->AddTransientUpdate(update.topic_name, item.key,
-            topic->Put(item.key, item.value, item.deleted));
+      Topic& topic = topic_it->second;
+      // Update the topic and add transient entries separately to avoid holding both
+      // locks at the same time and preventing concurrent topic updates.
+      vector<TopicEntry::Version> entry_versions = topic.Put(update.topic_entries);
+      if (!subscriber->AddTransientEntries(
+          update.topic_name, update.topic_entries, entry_versions)) {
+        // Subscriber was unregistered - clean up the transient entries.
+        for (int i = 0; i < update.topic_entries.size(); ++i) {
+          topic.DeleteIfVersionsMatch(entry_versions[i], update.topic_entries[i].key);
+        }
       }
     }
   }
-  topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
+  update_duration_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
   return Status::OK();
 }
 
 void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
-    TUpdateStateRequest* update_state_request) {
+    UpdateKind update_kind, TUpdateStateRequest* update_state_request) {
   {
-    lock_guard<mutex> l(topic_lock_);
-    for (const Subscriber::Topics::value_type& subscribed_topic:
-         subscriber.subscribed_topics()) {
-      TopicMap::const_iterator topic_it = topics_.find(subscribed_topic.first);
+    DCHECK(update_kind == UpdateKind::TOPIC_UPDATE
+        || update_kind ==  UpdateKind::PRIORITY_TOPIC_UPDATE)
+        << static_cast<int>(update_kind);
+    const bool is_priority = update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE;
+    const Subscriber::Topics& subscribed_topics = is_priority
+        ? subscriber.priority_subscribed_topics()
+        : subscriber.non_priority_subscribed_topics();
+    shared_lock<shared_mutex> l(topics_map_lock_);
+    for (const auto& subscribed_topic: subscribed_topics) {
+      auto topic_it = topics_.find(subscribed_topic.first);
       DCHECK(topic_it != topics_.end());
-
       TopicEntry::Version last_processed_version =
           subscriber.LastTopicVersionProcessed(topic_it->first);
-      const Topic& topic = topic_it->second;
 
       TTopicDelta& topic_delta =
           update_state_request->topic_deltas[subscribed_topic.first];
       topic_delta.topic_name = subscribed_topic.first;
-
-      // If the subscriber version is > 0, send this update as a delta. Otherwise, this is
-      // a new subscriber so send them a non-delta update that includes all items in the
-      // topic.
-      topic_delta.is_delta = last_processed_version > Subscriber::TOPIC_INITIAL_VERSION;
-      topic_delta.__set_from_version(last_processed_version);
-
-      TopicUpdateLog::const_iterator next_update =
-          topic.topic_update_log().upper_bound(last_processed_version);
-
-      uint64_t topic_size = 0;
-      for (; next_update != topic.topic_update_log().end(); ++next_update) {
-        TopicEntryMap::const_iterator itr = topic.entries().find(next_update->second);
-        DCHECK(itr != topic.entries().end());
-        const TopicEntry& topic_entry = itr->second;
-        // Don't send deleted entries for non-delta updates.
-        if (!topic_delta.is_delta && topic_entry.is_deleted()) {
-          continue;
-        }
-        topic_delta.topic_entries.push_back(TTopicItem());
-        TTopicItem& topic_item = topic_delta.topic_entries.back();
-        topic_item.key = itr->first;
-        topic_item.value = topic_entry.value();
-        topic_item.deleted = topic_entry.is_deleted();
-        topic_size += topic_item.key.size() + topic_item.value.size();
-      }
-
-      if (!topic_delta.is_delta &&
-          topic.last_version() > Subscriber::TOPIC_INITIAL_VERSION) {
-        VLOG_QUERY << "Preparing initial " << topic_delta.topic_name
-                   << " topic update for " << subscriber.id() << ". Size = "
-                   << PrettyPrinter::Print(topic_size, TUnit::BYTES);
-      }
-
-      if (topic.topic_update_log().size() > 0) {
-        // The largest version for this topic will be the last item in the version history
-        // map.
-        topic_delta.__set_to_version(topic.topic_update_log().rbegin()->first);
-      } else {
-        // There are no updates in the version history
-        topic_delta.__set_to_version(Subscriber::TOPIC_INITIAL_VERSION);
-      }
+      topic_it->second.BuildDelta(subscriber.id(), last_processed_version, &topic_delta);
     }
   }
 
   // Fill in the min subscriber topic version. This must be done after releasing
-  // topic_lock_.
+  // topics_map_lock_.
   lock_guard<mutex> l(subscribers_lock_);
   typedef map<TopicId, TTopicDelta> TopicDeltaMap;
   for (TopicDeltaMap::value_type& topic_delta: update_state_request->topic_deltas) {
@@ -586,8 +712,8 @@ Statestore::TopicEntry::Version Statestore::GetMinSubscriberTopicVersion(
   bool found = false;
   // Find the minimum version processed for this topic across all topic subscribers.
   for (const SubscriberMap::value_type& subscriber: subscribers_) {
-    if (subscriber.second->subscribed_topics().find(topic_id) !=
-        subscriber.second->subscribed_topics().end()) {
+    auto subscribed_topics = subscriber.second->GetTopicsMapForId(topic_id);
+    if (subscribed_topics->find(topic_id) != subscribed_topics->end()) {
       found = true;
       TopicEntry::Version last_processed_version =
           subscriber.second->LastTopicVersionProcessed(topic_id);
@@ -600,15 +726,33 @@ Statestore::TopicEntry::Version Statestore::GetMinSubscriberTopicVersion(
   return found ? min_topic_version : Subscriber::TOPIC_INITIAL_VERSION;
 }
 
-bool Statestore::ShouldExit() {
-  lock_guard<mutex> l(exit_flag_lock_);
-  return exit_flag_;
+bool Statestore::IsPrioritizedTopic(const string& topic) {
+  return topic == IMPALA_MEMBERSHIP_TOPIC || topic == IMPALA_REQUEST_QUEUE_TOPIC;
 }
 
-void Statestore::SetExitFlag() {
-  lock_guard<mutex> l(exit_flag_lock_);
-  exit_flag_ = true;
-  subscriber_topic_update_threadpool_.Shutdown();
+const char* Statestore::GetUpdateKindName(UpdateKind kind) {
+  switch (kind) {
+    case UpdateKind::TOPIC_UPDATE:
+      return "topic update";
+    case UpdateKind::PRIORITY_TOPIC_UPDATE:
+      return "priority topic update";
+    case UpdateKind::HEARTBEAT:
+      return "heartbeat";
+  }
+  DCHECK(false);
+}
+
+ThreadPool<Statestore::ScheduledSubscriberUpdate>* Statestore::GetThreadPool(
+    UpdateKind kind) {
+  switch (kind) {
+    case UpdateKind::TOPIC_UPDATE:
+      return &subscriber_topic_update_threadpool_;
+    case UpdateKind::PRIORITY_TOPIC_UPDATE:
+      return &subscriber_priority_topic_update_threadpool_;
+    case UpdateKind::HEARTBEAT:
+      return &subscriber_heartbeat_threadpool_;
+  }
+  DCHECK(false);
 }
 
 Status Statestore::SendHeartbeat(Subscriber* subscriber) {
@@ -630,8 +774,9 @@ Status Statestore::SendHeartbeat(Subscriber* subscriber) {
   return Status::OK();
 }
 
-void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
+void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
     const ScheduledSubscriberUpdate& update) {
+  const bool is_heartbeat = update_kind == UpdateKind::HEARTBEAT;
   int64_t update_deadline = update.deadline;
   shared_ptr<Subscriber> subscriber;
   // Check if the subscriber has re-registered, in which case we can ignore
@@ -639,7 +784,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
   if (!FindSubscriber(update.subscriber_id, update.registration_id, &subscriber)) {
     return;
   }
-  const string hb_type = is_heartbeat ? "heartbeat" : "topic update";
+  const char* update_kind_str = GetUpdateKindName(update_kind);
   if (update_deadline != 0) {
     // Wait until deadline.
     int64_t diff_ms = update_deadline - UnixMillis();
@@ -654,7 +799,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
       return;
     }
     diff_ms = std::abs(diff_ms);
-    VLOG(3) << "Sending " << hb_type << " message to: " << update.subscriber_id
+    VLOG(3) << "Sending " << update_kind_str << " message to: " << update.subscriber_id
         << " (deadline accuracy: " << diff_ms << "ms)";
 
     if (diff_ms > DEADLINE_MISS_THRESHOLD_MS && is_heartbeat) {
@@ -663,7 +808,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
           "consider increasing --statestore_heartbeat_frequency_ms (currently $3) on "
           "this Statestore, and --statestore_subscriber_timeout_seconds "
           "on subscribers (Impala daemons and the Catalog Server)",
-          update.subscriber_id, hb_type, diff_ms,
+          update.subscriber_id, update_kind_str, diff_ms,
           FLAGS_statestore_heartbeat_frequency_ms);
       LOG(WARNING) << msg;
     }
@@ -674,7 +819,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
   } else {
     // The first update is scheduled immediately and has a deadline of 0. There's no need
     // to wait.
-    VLOG(3) << "Initial " << hb_type << " message for: " << update.subscriber_id;
+    VLOG(3) << "Initial " << update_kind_str << " message for: " << update.subscriber_id;
   }
 
   // Send the right message type, and compute the next deadline
@@ -692,18 +837,21 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
     deadline_ms = UnixMillis() + FLAGS_statestore_heartbeat_frequency_ms;
   } else {
     bool update_skipped;
-    status = SendTopicUpdate(subscriber.get(), &update_skipped);
+    status = SendTopicUpdate(subscriber.get(), update_kind, &update_skipped);
     if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
       // Add details to status to make it more useful, while preserving the stack
       status.AddDetail(Substitute(
           "Subscriber $0 timed-out during topic-update RPC. Timeout is $1s.",
           subscriber->id(), FLAGS_statestore_update_tcp_timeout_seconds));
     }
-    // If the subscriber responded that it skipped the last update sent, we assume that
-    // it was busy doing something else, and back off slightly before sending another.
-    int64_t update_interval = update_skipped ?
-        (2 * FLAGS_statestore_update_frequency_ms) :
-        FLAGS_statestore_update_frequency_ms;
+    // If the subscriber responded that it skipped a topic in the last update sent,
+    // we assume that it was busy doing something else, and back off slightly before
+    // sending another.
+    int64_t update_frequency = update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE
+        ? FLAGS_statestore_priority_update_frequency_ms
+        : FLAGS_statestore_update_frequency_ms;
+    int64_t update_interval = update_skipped ? (2 * update_frequency)
+                                                 : update_frequency;
     deadline_ms = UnixMillis() + update_interval;
   }
 
@@ -715,7 +863,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
     if (it == subscribers_.end() ||
         it->second->registration_id() != update.registration_id) return;
     if (!status.ok()) {
-      LOG(INFO) << "Unable to send " << hb_type << " message to subscriber "
+      LOG(INFO) << "Unable to send " << update_kind_str << " message to subscriber "
                 << update.subscriber_id << ", received error: " << status.GetDetail();
     }
 
@@ -736,10 +884,9 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
       VLOG(3) << "Next " << (is_heartbeat ? "heartbeat" : "update") << " deadline for: "
               << subscriber->id() << " is in " << deadline_ms << "ms";
       status = OfferUpdate(ScheduledSubscriberUpdate(deadline_ms, subscriber->id(),
-          subscriber->registration_id()), is_heartbeat ?
-          &subscriber_heartbeat_threadpool_ : &subscriber_topic_update_threadpool_);
+          subscriber->registration_id()), GetThreadPool(update_kind));
       if (!status.ok()) {
-        LOG(INFO) << "Unable to send next " << (is_heartbeat ? "heartbeat" : "update")
+        LOG(INFO) << "Unable to send next " << update_kind_str
                   << " message to subscriber '" << subscriber->id() << "': "
                   << status.GetDetail();
       }
@@ -763,14 +910,11 @@ void Statestore::UnregisterSubscriber(Subscriber* subscriber) {
   failure_detector_->EvictPeer(PrintId(subscriber->registration_id()));
 
   // Delete all transient entries
-  lock_guard<mutex> topic_lock(topic_lock_);
-  for (Statestore::Subscriber::TransientEntryMap::value_type entry:
-       subscriber->transient_entries()) {
-    Statestore::TopicMap::iterator topic_it = topics_.find(entry.first.first);
-    DCHECK(topic_it != topics_.end());
-    topic_it->second.DeleteIfVersionsMatch(entry.second, // version
-        entry.first.second); // key
+  {
+    shared_lock<shared_mutex> topic_lock(topics_map_lock_);
+    subscriber->DeleteAllTransientEntries(&topics_);
   }
+
   num_subscribers_metric_->Increment(-1L);
   subscriber_set_metric_->Remove(subscriber->id());
   subscribers_.erase(subscriber->id());
@@ -778,4 +922,6 @@ void Statestore::UnregisterSubscriber(Subscriber* subscriber) {
 
 void Statestore::MainLoop() {
   subscriber_topic_update_threadpool_.Join();
+  subscriber_priority_topic_update_threadpool_.Join();
+  subscriber_heartbeat_threadpool_.Join();
 }