You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/14 23:21:13 UTC

[1/6] impala git commit: KUDU-2296: Fix deserialization of messages larger than 64MB

Repository: impala
Updated Branches:
  refs/heads/master f9971f81a -> b0d3433e3


KUDU-2296: Fix deserialization of messages larger than 64MB

Protobuf's CodedInputStream has a 64MB total byte limit by
default. When trying to deserialize messages larger than
this, ParseMessage() hits this limit and mistakenly
think that the packet is too short. This issue is dormant
due to Kudu's default rpc_max_message_size of 50MB.
However, Impala will be using a larger value for
rpc_max_message_size and requires this fix.

The fix is to override the default 64MB limit by calling
CodedInputStream::SetTotalByteLimit() with the buffer's
size.

Change-Id: I57d3f3ca6ec0aa8be0e67e6a13c4b560c9d2c63a
Reviewed-on: http://gerrit.cloudera.org:8080/9312
Tested-by: Kudu Jenkins
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-on: http://gerrit.cloudera.org:8080/9313
Reviewed-by: Joe McDonnell <jo...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d7f2ce10
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d7f2ce10
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d7f2ce10

Branch: refs/heads/master
Commit: d7f2ce10b4b77258640be3dfba3bee6cb5031555
Parents: f9971f8
Author: Joe McDonnell <jo...@cloudera.com>
Authored: Tue Feb 13 14:43:19 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 14 03:01:59 2018 +0000

----------------------------------------------------------------------
 be/src/kudu/rpc/serialization.cc | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d7f2ce10/be/src/kudu/rpc/serialization.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/rpc/serialization.cc b/be/src/kudu/rpc/serialization.cc
index dbb0fc5..fbc05bc 100644
--- a/be/src/kudu/rpc/serialization.cc
+++ b/be/src/kudu/rpc/serialization.cc
@@ -114,6 +114,10 @@ Status ParseMessage(const Slice& buf,
     << "Got mis-sized buffer: " << KUDU_REDACT(buf.ToDebugString());
 
   CodedInputStream in(buf.data(), buf.size());
+  // Protobuf enforces a 64MB total bytes limit on CodedInputStream by default.
+  // Override this default with the actual size of the buffer to allow messages
+  // larger than 64MB.
+  in.SetTotalBytesLimit(buf.size(), -1);
   in.Skip(kMsgLengthPrefixLength);
 
   uint32_t header_len;


[6/6] impala git commit: IMPALA-4953, IMPALA-6437: separate AC/scheduler from catalog topic updates

Posted by ta...@apache.org.
IMPALA-4953,IMPALA-6437: separate AC/scheduler from catalog topic updates

This adds a set of "prioritized" statestore topics that are small but
are important to deliver in a timely manner. These are delivered more
frequently by a separate thread pool to reduce the window for stale
admission control and scheduling information.

The contract between statestore and subscriber is changed so that the
statestore can send concurrent Update() RPCs for disjoint sets of
topics. This required changes to the subscriber implementation, which
assumed that only one Update RPC would arrive at a time.

It also changes the locking in the statestore so that the prioritized
update threads don't get stuck behind the catalog threads holding
'topic_lock_'. Specifically, it uses a reader-writer lock to protect
modification of the set of topics and a reader-writer lock per topic to
allow the topic data to be read by multiple threads concurrently.

Added metrics to monitor the per-topic update interval.

Testing:
Ran core tests.

Inspected metrics on Impala daemons, saw that membership and request
queue processing times had more samples recorded than the catalog
topic, reflecting the increased frequency.

Ran under thread sanitizer, made sure no data races were reported in
Statestore or StatestoreSubscriber.

Change-Id: Ifc49c2d0f2a5bfad822545616b8c62b4b95dc210
Reviewed-on: http://gerrit.cloudera.org:8080/9123
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/b0d3433e
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/b0d3433e
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/b0d3433e

Branch: refs/heads/master
Commit: b0d3433e36d7942b3e10bddc310287266240810b
Parents: e117365
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jan 24 09:37:36 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 14 22:44:40 2018 +0000

----------------------------------------------------------------------
 be/src/scheduling/admission-controller.cc       |   8 +-
 be/src/scheduling/admission-controller.h        |   3 -
 be/src/scheduling/scheduler-test-util.cc        |  10 +-
 be/src/scheduling/scheduler.cc                  |   7 +-
 be/src/scheduling/scheduler.h                   |   2 -
 be/src/service/impala-server.cc                 |   6 +-
 be/src/statestore/statestore-subscriber.cc      | 219 ++++++---
 be/src/statestore/statestore-subscriber.h       | 128 ++---
 be/src/statestore/statestore.cc                 | 490 ++++++++++++-------
 be/src/statestore/statestore.h                  | 295 +++++++----
 common/thrift/metrics.json                      |  25 +-
 .../custom_cluster/test_admission_controller.py |  20 +-
 tests/statestore/test_statestore.py             | 109 +++--
 www/statestore_subscribers.tmpl                 |   2 +
 www/statestore_topics.tmpl                      |   2 +
 15 files changed, 843 insertions(+), 483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/scheduling/admission-controller.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.cc b/be/src/scheduling/admission-controller.cc
index f43af2c..640a6af 100644
--- a/be/src/scheduling/admission-controller.cc
+++ b/be/src/scheduling/admission-controller.cc
@@ -50,8 +50,6 @@ int64_t GetProcMemLimit() {
   return ExecEnv::GetInstance()->process_mem_tracker()->limit();
 }
 
-const string AdmissionController::IMPALA_REQUEST_QUEUE_TOPIC("impala-request-queue");
-
 // Delimiter used for topic keys of the form "<pool_name><delimiter><backend_id>".
 // "!" is used because the backend id contains a colon, but it should not contain "!".
 // When parsing the topic key we need to be careful to find the last instance in
@@ -243,7 +241,7 @@ Status AdmissionController::Init() {
       &AdmissionController::DequeueLoop, this, &dequeue_thread_));
   StatestoreSubscriber::UpdateCallback cb =
     bind<void>(mem_fn(&AdmissionController::UpdatePoolStats), this, _1, _2);
-  Status status = subscriber_->AddTopic(IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
+  Status status = subscriber_->AddTopic(Statestore::IMPALA_REQUEST_QUEUE_TOPIC, true, cb);
   if (!status.ok()) {
     status.AddDetail("AdmissionController failed to register request queue topic");
   }
@@ -632,7 +630,7 @@ void AdmissionController::UpdatePoolStats(
     AddPoolUpdates(subscriber_topic_updates);
 
     StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
-        incoming_topic_deltas.find(IMPALA_REQUEST_QUEUE_TOPIC);
+        incoming_topic_deltas.find(Statestore::IMPALA_REQUEST_QUEUE_TOPIC);
     if (topic != incoming_topic_deltas.end()) {
       const TTopicDelta& delta = topic->second;
       // Delta and non-delta updates are handled the same way, except for a full update
@@ -799,7 +797,7 @@ void AdmissionController::AddPoolUpdates(vector<TTopicDelta>* topic_updates) {
   if (pools_for_updates_.empty()) return;
   topic_updates->push_back(TTopicDelta());
   TTopicDelta& topic_delta = topic_updates->back();
-  topic_delta.topic_name = IMPALA_REQUEST_QUEUE_TOPIC;
+  topic_delta.topic_name = Statestore::IMPALA_REQUEST_QUEUE_TOPIC;
   for (const string& pool_name: pools_for_updates_) {
     DCHECK(pool_stats_.find(pool_name) != pool_stats_.end());
     PoolStats* stats = GetPoolStats(pool_name);

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/scheduling/admission-controller.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/admission-controller.h b/be/src/scheduling/admission-controller.h
index 2830bee..3341b1b 100644
--- a/be/src/scheduling/admission-controller.h
+++ b/be/src/scheduling/admission-controller.h
@@ -206,9 +206,6 @@ class AdmissionController {
   class PoolStats;
   friend class PoolStats;
 
-  /// Statestore topic name.
-  static const std::string IMPALA_REQUEST_QUEUE_TOPIC;
-
   /// Subscription manager used to handle admission control updates. This is not
   /// owned by this class.
   StatestoreSubscriber* subscriber_;

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/scheduling/scheduler-test-util.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler-test-util.cc b/be/src/scheduling/scheduler-test-util.cc
index 05cfc42..7363cd3 100644
--- a/be/src/scheduling/scheduler-test-util.cc
+++ b/be/src/scheduling/scheduler-test-util.cc
@@ -467,7 +467,7 @@ Status SchedulerWrapper::Compute(bool exec_at_coord, Result* result) {
 void SchedulerWrapper::AddBackend(const Host& host) {
   // Add to topic delta
   TTopicDelta delta;
-  delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
   AddHostToTopicDelta(host, &delta);
   SendTopicDelta(delta);
@@ -476,7 +476,7 @@ void SchedulerWrapper::AddBackend(const Host& host) {
 void SchedulerWrapper::RemoveBackend(const Host& host) {
   // Add deletion to topic delta
   TTopicDelta delta;
-  delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
   TTopicItem item;
   item.__set_deleted(true);
@@ -487,7 +487,7 @@ void SchedulerWrapper::RemoveBackend(const Host& host) {
 
 void SchedulerWrapper::SendFullMembershipMap() {
   TTopicDelta delta;
-  delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = false;
   for (const Host& host : plan_.cluster().hosts()) {
     if (host.be_port >= 0) AddHostToTopicDelta(host, &delta);
@@ -497,7 +497,7 @@ void SchedulerWrapper::SendFullMembershipMap() {
 
 void SchedulerWrapper::SendEmptyUpdate() {
   TTopicDelta delta;
-  delta.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  delta.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   delta.is_delta = true;
   SendTopicDelta(delta);
 }
@@ -547,7 +547,7 @@ void SchedulerWrapper::SendTopicDelta(const TTopicDelta& delta) {
   DCHECK(scheduler_ != nullptr);
   // Wrap in topic delta map.
   StatestoreSubscriber::TopicDeltaMap delta_map;
-  delta_map.emplace(Scheduler::IMPALA_MEMBERSHIP_TOPIC, delta);
+  delta_map.emplace(Statestore::IMPALA_MEMBERSHIP_TOPIC, delta);
 
   // Send to the scheduler.
   vector<TTopicDelta> dummy_result;

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/scheduling/scheduler.cc
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc
index 527b5cf..5a9d4bf 100644
--- a/be/src/scheduling/scheduler.cc
+++ b/be/src/scheduling/scheduler.cc
@@ -52,8 +52,6 @@ static const string ASSIGNMENTS_KEY("simple-scheduler.assignments.total");
 static const string SCHEDULER_INIT_KEY("simple-scheduler.initialized");
 static const string NUM_BACKENDS_KEY("simple-scheduler.num-backends");
 
-const string Scheduler::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
-
 Scheduler::Scheduler(StatestoreSubscriber* subscriber, const string& backend_id,
     MetricGroup* metrics, Webserver* webserver, RequestPoolService* request_pool_service)
   : executors_config_(std::make_shared<const BackendConfig>()),
@@ -86,7 +84,8 @@ Status Scheduler::Init(const TNetworkAddress& backend_address,
   if (statestore_subscriber_ != nullptr) {
     StatestoreSubscriber::UpdateCallback cb =
         bind<void>(mem_fn(&Scheduler::UpdateMembership), this, _1, _2);
-    Status status = statestore_subscriber_->AddTopic(IMPALA_MEMBERSHIP_TOPIC, true, cb);
+    Status status = statestore_subscriber_->AddTopic(
+        Statestore::IMPALA_MEMBERSHIP_TOPIC, true, cb);
     if (!status.ok()) {
       status.AddDetail("Scheduler failed to register membership topic");
       return status;
@@ -123,7 +122,7 @@ void Scheduler::UpdateMembership(
     vector<TTopicDelta>* subscriber_topic_updates) {
   // First look to see if the topic(s) we're interested in have an update
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
-      incoming_topic_deltas.find(IMPALA_MEMBERSHIP_TOPIC);
+      incoming_topic_deltas.find(Statestore::IMPALA_MEMBERSHIP_TOPIC);
 
   if (topic == incoming_topic_deltas.end()) return;
   const TTopicDelta& delta = topic->second;

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/scheduling/scheduler.h
----------------------------------------------------------------------
diff --git a/be/src/scheduling/scheduler.h b/be/src/scheduling/scheduler.h
index 4be2996..2d77797 100644
--- a/be/src/scheduling/scheduler.h
+++ b/be/src/scheduling/scheduler.h
@@ -68,8 +68,6 @@ class SchedulerWrapper;
 ///           configuration.
 class Scheduler {
  public:
-  static const std::string IMPALA_MEMBERSHIP_TOPIC;
-
   /// List of server descriptors.
   typedef std::vector<TBackendDescriptor> BackendList;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index d5be4dc..3866e40 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -357,7 +357,7 @@ ImpalaServer::ImpalaServer(ExecEnv* exec_env)
       this->MembershipCallback(state, topic_updates);
     };
     ABORT_IF_ERROR(
-        exec_env->subscriber()->AddTopic(Scheduler::IMPALA_MEMBERSHIP_TOPIC, true, cb));
+        exec_env->subscriber()->AddTopic(Statestore::IMPALA_MEMBERSHIP_TOPIC, true, cb));
 
     if (FLAGS_is_coordinator) {
       auto catalog_cb = [this] (const StatestoreSubscriber::TopicDeltaMap& state,
@@ -1482,7 +1482,7 @@ void ImpalaServer::MembershipCallback(
   // TODO: Consider rate-limiting this. In the short term, best to have
   // statestore heartbeat less frequently.
   StatestoreSubscriber::TopicDeltaMap::const_iterator topic =
-      incoming_topic_deltas.find(Scheduler::IMPALA_MEMBERSHIP_TOPIC);
+      incoming_topic_deltas.find(Statestore::IMPALA_MEMBERSHIP_TOPIC);
 
   if (topic != incoming_topic_deltas.end()) {
     const TTopicDelta& delta = topic->second;
@@ -1607,7 +1607,7 @@ void ImpalaServer::AddLocalBackendToStatestore(
   }
   subscriber_topic_updates->emplace_back(TTopicDelta());
   TTopicDelta& update = subscriber_topic_updates->back();
-  update.topic_name = Scheduler::IMPALA_MEMBERSHIP_TOPIC;
+  update.topic_name = Statestore::IMPALA_MEMBERSHIP_TOPIC;
   update.topic_entries.emplace_back(TTopicItem());
 
   TTopicItem& item = update.topic_entries.back();

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index e58c177..a27d1ae 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -22,6 +22,7 @@
 
 #include <boost/algorithm/string/join.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/thread/lock_options.hpp>
 #include <boost/thread/shared_mutex.hpp>
 #include <gutil/strings/substitute.h>
 
@@ -39,6 +40,9 @@
 #include "common/names.h"
 
 using boost::posix_time::seconds;
+using boost::shared_lock;
+using boost::shared_mutex;
+using boost::try_to_lock;
 using namespace apache::thrift;
 using namespace apache::thrift::transport;
 using namespace strings;
@@ -65,6 +69,9 @@ const string STATESTORE_ID = "STATESTORE";
 // Template for metrics that measure the processing time for individual topics.
 const string CALLBACK_METRIC_PATTERN = "statestore-subscriber.topic-$0.processing-time-s";
 
+// Template for metrics that measure the interval between updates for individual topics.
+const string UPDATE_INTERVAL_METRIC_PATTERN = "statestore-subscriber.topic-$0.update-interval";
+
 // Duration, in ms, to sleep between attempts to reconnect to the
 // statestore after a failure.
 const int32_t SLEEP_INTERVAL_MS = 5000;
@@ -107,41 +114,42 @@ StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
       failure_detector_(new TimeoutFailureDetector(
           seconds(FLAGS_statestore_subscriber_timeout_seconds),
           seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
-      is_registered_(false),
       client_cache_(new StatestoreClientCache(FLAGS_statestore_subscriber_cnxn_attempts,
-          FLAGS_statestore_subscriber_cnxn_retry_interval_ms, 0, 0, "",
-          !FLAGS_ssl_client_ca_certificate.empty())),
-      metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")) {
+                FLAGS_statestore_subscriber_cnxn_retry_interval_ms, 0, 0, "",
+                !FLAGS_ssl_client_ca_certificate.empty())),
+      metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")),
+      is_registered_(false) {
   connected_to_statestore_metric_ =
       metrics_->AddProperty("statestore-subscriber.connected", false);
   last_recovery_duration_metric_ = metrics_->AddDoubleGauge(
       "statestore-subscriber.last-recovery-duration", 0.0);
   last_recovery_time_metric_ = metrics_->AddProperty<string>(
       "statestore-subscriber.last-recovery-time", "N/A");
-  topic_update_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
+  topic_update_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics_,
       "statestore-subscriber.topic-update-interval-time");
-  topic_update_duration_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
+  topic_update_duration_metric_ = StatsMetric<double>::CreateAndRegister(metrics_,
       "statestore-subscriber.topic-update-duration");
-  heartbeat_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
+  heartbeat_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics_,
       "statestore-subscriber.heartbeat-interval-time");
-
   registration_id_metric_ = metrics->AddProperty<string>(
       "statestore-subscriber.registration-id", "N/A");
-
   client_cache_->InitMetrics(metrics, "statestore-subscriber.statestore");
 }
 
 Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
     bool is_transient, const UpdateCallback& callback) {
-  lock_guard<mutex> l(lock_);
+  lock_guard<shared_mutex> exclusive_lock(lock_);
   if (is_registered_) return Status("Subscriber already started, can't add new topic");
-  Callbacks* cb = &(update_callbacks_[topic_id]);
-  cb->callbacks.push_back(callback);
-  if (cb->processing_time_metric == NULL) {
-    cb->processing_time_metric = StatsMetric<double>::CreateAndRegister(metrics_,
+  TopicRegistration& registration = topic_registrations_[topic_id];
+  registration.callbacks.push_back(callback);
+  if (registration.processing_time_metric == nullptr) {
+    registration.processing_time_metric = StatsMetric<double>::CreateAndRegister(metrics_,
         CALLBACK_METRIC_PATTERN, topic_id);
+    registration.update_interval_metric = StatsMetric<double>::CreateAndRegister(metrics_,
+        UPDATE_INTERVAL_METRIC_PATTERN, topic_id);
+    registration.update_interval_timer.Start();
   }
-  topic_registrations_[topic_id] = is_transient;
+  registration.is_transient = is_transient;
   return Status::OK();
 }
 
@@ -151,11 +159,10 @@ Status StatestoreSubscriber::Register() {
   RETURN_IF_ERROR(client_status);
 
   TRegisterSubscriberRequest request;
-  request.topic_registrations.reserve(update_callbacks_.size());
-  for (const UpdateCallbacks::value_type& topic: update_callbacks_) {
+  for (const auto& registration : topic_registrations_) {
     TTopicRegistration thrift_topic;
-    thrift_topic.topic_name = topic.first;
-    thrift_topic.is_transient = topic_registrations_[topic.first];
+    thrift_topic.topic_name = registration.first;
+    thrift_topic.is_transient = registration.second.is_transient;
     request.topic_registrations.push_back(thrift_topic);
   }
 
@@ -175,7 +182,6 @@ Status StatestoreSubscriber::Register() {
   } else {
     VLOG(1) << "No subscriber registration ID received from statestore";
   }
-  topic_update_interval_timer_.Start();
   heartbeat_interval_timer_.Start();
   return status;
 }
@@ -186,7 +192,7 @@ Status StatestoreSubscriber::Start() {
     // Take the lock to ensure that, if a topic-update is received during registration
     // (perhaps because Register() has succeeded, but we haven't finished setting up state
     // on the client side), UpdateState() will reject the message.
-    lock_guard<mutex> l(lock_);
+    lock_guard<shared_mutex> exclusive_lock(lock_);
     LOG(INFO) << "Starting statestore subscriber";
 
     // Backend must be started before registration
@@ -241,7 +247,7 @@ void StatestoreSubscriber::RecoveryModeChecker() {
     if (failure_detector_->GetPeerState(STATESTORE_ID) == FailureDetector::FAILED) {
       // When entering recovery mode, the class-wide lock_ is taken to
       // ensure mutual exclusion with any operations in flight.
-      lock_guard<mutex> l(lock_);
+      lock_guard<shared_mutex> exclusive_lock(lock_);
       MonotonicStopWatch recovery_timer;
       recovery_timer.Start();
       connected_to_statestore_metric_->SetValue(false);
@@ -313,74 +319,127 @@ void StatestoreSubscriber::Heartbeat(const RegistrationId& registration_id) {
 Status StatestoreSubscriber::UpdateState(const TopicDeltaMap& incoming_topic_deltas,
     const RegistrationId& registration_id, vector<TTopicDelta>* subscriber_topic_updates,
     bool* skipped) {
+  RETURN_IF_ERROR(CheckRegistrationId(registration_id));
+
+  // Put the updates into ascending order of topic name to match the lock acquisition
+  // order of TopicRegistration::update_lock.
+  vector<const TTopicDelta*> deltas_to_process;
+  for (auto& delta : incoming_topic_deltas) deltas_to_process.push_back(&delta.second);
+  sort(deltas_to_process.begin(), deltas_to_process.end(),
+      [](const TTopicDelta* left, const TTopicDelta* right) {
+        return left->topic_name < right->topic_name;
+      });
+  // Unique locks to hold the 'update_lock' for each entry in 'deltas_to_process'. Locks
+  // are held until we finish processing the update to prevent any races with concurrent
+  // updates for the same topic.
+  vector<unique_lock<mutex>> topic_update_locks(deltas_to_process.size());
+
   // We don't want to block here because this is an RPC, and delaying the return causes
   // the statestore to delay sending further messages. The only time that lock_ might be
-  // taken concurrently is if:
+  // taken exclusively is if the subscriber is recovering, and has the lock held during
+  // RecoveryModeChecker(). In this case we skip all topics and don't update any metrics.
   //
-  // a) another update is still being processed (i.e. is still in UpdateState()). This
-  // could happen only when the subscriber has re-registered, and the statestore is still
-  // sending an update for the previous registration. In this case, return OK but set
-  // *skipped = true to tell the statestore to retry this update in the future.
+  // UpdateState() may run concurrently with itself in two cases:
+  // a) disjoint sets of topics are being updated. In that case the updates can proceed
+  // concurrently.
+  // b) another update for the same topics is still being processed (i.e. is still in
+  // UpdateState()). This could happen only when the subscriber has re-registered, and
+  // the statestore is still sending an update for the previous registration. In this
+  // case, we notices that the per-topic 'update_lock' is held, skip processing all
+  // of the topic updates and set *skipped = true so that the statestore will retry this
+  // update in the future.
   //
-  // b) the subscriber is recovering, and has the lock held during
-  // RecoveryModeChecker(). Similarly, we set *skipped = true.
   // TODO: Consider returning an error in this case so that the statestore will eventually
   // stop sending updates even if re-registration fails.
-  try_mutex::scoped_try_lock l(lock_);
-  if (l) {
-    *skipped = false;
-    RETURN_IF_ERROR(CheckRegistrationId(registration_id));
-
-    // Only record updates received when not in recovery mode
-    topic_update_interval_metric_->Update(
-        topic_update_interval_timer_.Reset() / (1000.0 * 1000.0 * 1000.0));
-    MonotonicStopWatch sw;
-    sw.Start();
-
-    // Check the version ranges of all delta updates to ensure they can be applied
-    // to this subscriber. If any invalid ranges are found, request new update(s) with
-    // version ranges applicable to this subscriber.
-    bool found_unexpected_delta = false;
-    for (const TopicDeltaMap::value_type& delta: incoming_topic_deltas) {
-      TopicVersionMap::const_iterator itr = current_topic_versions_.find(delta.first);
-      if (itr != current_topic_versions_.end()) {
-        if (delta.second.is_delta && delta.second.from_version != itr->second) {
-          LOG(ERROR) << "Unexpected delta update to topic '" << delta.first << "' of "
-                     << "version range (" << delta.second.from_version << ":"
-                     << delta.second.to_version << "]. Expected delta start version: "
-                     << itr->second;
-
-          subscriber_topic_updates->push_back(TTopicDelta());
-          TTopicDelta& update = subscriber_topic_updates->back();
-          update.topic_name = delta.second.topic_name;
-          update.__set_from_version(itr->second);
-          found_unexpected_delta = true;
-        } else {
-          // Update the current topic version
-          current_topic_versions_[delta.first] = delta.second.to_version;
-        }
-      }
+  shared_lock<shared_mutex> l(lock_, try_to_lock);
+  if (!l.owns_lock()) {
+    *skipped = true;
+    return Status::OK();
+  }
+
+  // First, acquire all the topic locks and update the interval metrics
+  // Record the time we received the update before doing any processing to avoid including
+  // processing time in the interval metrics.
+  for (int i = 0; i < deltas_to_process.size(); ++i) {
+    const TTopicDelta& delta = *deltas_to_process[i];
+    auto it = topic_registrations_.find(delta.topic_name);
+    // Skip updates to unregistered topics.
+    if (it == topic_registrations_.end()) {
+      LOG(ERROR) << "Unexpected delta update for unregistered topic: "
+                 << delta.topic_name;
+      continue;
+    }
+    TopicRegistration& registration = it->second;
+    unique_lock<mutex> ul(registration.update_lock, try_to_lock);
+    if (!ul.owns_lock()) {
+      // Statestore sent out concurrent topic updates. Avoid blocking the RPC by skipping
+      // the topic.
+      LOG(ERROR) << "Could not acquire lock for topic " << delta.topic_name << ". "
+                 << "Skipping update.";
+      *skipped = true;
+      return Status::OK();
     }
+    double interval =
+        registration.update_interval_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0);
+    registration.update_interval_metric->Update(interval);
+    topic_update_interval_metric_->Update(interval);
 
-    // Skip calling the callbacks when an unexpected delta update is found.
-    if (!found_unexpected_delta) {
-      for (const UpdateCallbacks::value_type& callbacks: update_callbacks_) {
-        MonotonicStopWatch sw;
-        sw.Start();
-        for (const UpdateCallback& callback: callbacks.second.callbacks) {
-          // TODO: Consider filtering the topics to only send registered topics to
-          // callbacks
-          callback(incoming_topic_deltas, subscriber_topic_updates);
-        }
-        callbacks.second.processing_time_metric->Update(
-            sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
-      }
+    // Hold onto lock until we've finished processing the update.
+    topic_update_locks[i].swap(ul);
+  }
+
+  MonotonicStopWatch sw;
+  sw.Start();
+  // Second, do the actual processing of topic updates that we validated and acquired
+  // locks for above.
+  for (int i = 0; i < deltas_to_process.size(); ++i) {
+    if (!topic_update_locks[i].owns_lock()) continue;
+
+    const TTopicDelta& delta = *deltas_to_process[i];
+    auto it = topic_registrations_.find(delta.topic_name);
+    DCHECK(it != topic_registrations_.end());
+    TopicRegistration& registration = it->second;
+    if (delta.is_delta && registration.current_topic_version != -1
+      && delta.from_version != registration.current_topic_version) {
+      // Received a delta update for the wrong version. Log an error and send back the
+      // expected version to the statestore to request a new update with the correct
+      // version range.
+      LOG(ERROR) << "Unexpected delta update to topic '" << delta.topic_name << "' of "
+                 << "version range (" << delta.from_version << ":"
+                 << delta.to_version << "]. Expected delta start version: "
+                 << registration.current_topic_version;
+
+      subscriber_topic_updates->push_back(TTopicDelta());
+      TTopicDelta& update = subscriber_topic_updates->back();
+      update.topic_name = delta.topic_name;
+      update.__set_from_version(registration.current_topic_version);
+      continue;
     }
-    sw.Stop();
-    topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
-  } else {
-    *skipped = true;
+    // The topic version in the update is valid, process the update.
+    MonotonicStopWatch update_callback_sw;
+    update_callback_sw.Start();
+    for (const UpdateCallback& callback : registration.callbacks) {
+      callback(incoming_topic_deltas, subscriber_topic_updates);
+    }
+    update_callback_sw.Stop();
+    registration.current_topic_version = delta.to_version;
+    registration.processing_time_metric->Update(
+        sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
+  }
+
+  // Third and finally, reset the interval timers so they correctly measure the
+  // time between RPCs, excluding processing time.
+  for (int i = 0; i < deltas_to_process.size(); ++i) {
+    if (!topic_update_locks[i].owns_lock()) continue;
+
+    const TTopicDelta& delta = *deltas_to_process[i];
+    auto it = topic_registrations_.find(delta.topic_name);
+    DCHECK(it != topic_registrations_.end());
+    TopicRegistration& registration = it->second;
+    registration.update_interval_timer.Reset();
   }
+  sw.Stop();
+  topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/statestore/statestore-subscriber.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index e8b2204..f102cae 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -24,6 +24,7 @@
 #include <boost/scoped_ptr.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/thread/mutex.hpp>
+#include <boost/thread/shared_mutex.hpp>
 
 #include "statestore/statestore.h"
 #include "util/stopwatch.h"
@@ -84,9 +85,9 @@ class StatestoreSubscriber {
   /// Function called to update a service with new state. Called in a
   /// separate thread to the one in which it is registered.
   //
-  /// Every UpdateCallback is invoked every time that an update is
-  /// received from the statestore. Therefore the callback should not
-  /// assume that the TopicDeltaMap contains an entry for their
+  /// Every UpdateCallback is invoked every time that an update for the
+  /// topic is received from the statestore. Therefore the callback should
+  /// not assume that the TopicDeltaMap contains an entry for their
   /// particular topic of interest.
   //
   /// If a delta for a particular topic does not have the 'is_delta'
@@ -145,55 +146,7 @@ class StatestoreSubscriber {
   /// Thread in which RecoveryModeChecker runs.
   std::unique_ptr<Thread> recovery_mode_thread_;
 
-  /// Class-wide lock. Protects all subsequent members. Most private methods must
-  /// be called holding this lock; this is noted in the method comments.
-  boost::mutex lock_;
-
-  /// Set to true after Register(...) is successful, after which no
-  /// more topics may be subscribed to.
-  bool is_registered_;
-
-  /// Protects registration_id_. Must be taken after lock_ if both are to be taken
-  /// together.
-  boost::mutex registration_id_lock_;
-
-  /// Set during Register(), this is the unique ID of the current registration with the
-  /// statestore. If this subscriber must recover, or disconnects and then reconnects, the
-  /// registration_id_ will change after Register() is called again. This allows the
-  /// subscriber to reject communication from the statestore that pertains to a previous
-  /// registration.
-  RegistrationId registration_id_;
-
-  struct Callbacks {
-    /// Owned by the MetricGroup instance. Tracks how long callbacks took to process this
-    /// topic.
-    StatsMetric<double>* processing_time_metric;
-
-    /// List of callbacks to invoke for this topic.
-    std::vector<UpdateCallback> callbacks;
-  };
-
-  /// Mapping of topic ids to their associated callbacks. Because this mapping
-  /// stores a pointer to an UpdateCallback, memory errors will occur if an UpdateCallback
-  /// is deleted before being unregistered. The UpdateCallback destructor checks for
-  /// such problems, so that we will have an assertion failure rather than a memory error.
-  typedef boost::unordered_map<Statestore::TopicId, Callbacks> UpdateCallbacks;
-
-  /// Callback for all services that have registered for updates (indexed by the associated
-  /// SubscriptionId), and associated lock.
-  UpdateCallbacks update_callbacks_;
-
-  /// One entry for every topic subscribed to. The value is whether this subscriber
-  /// considers this topic to be 'transient', that is any updates it makes will be deleted
-  /// upon failure or disconnection.
-  std::map<Statestore::TopicId, bool> topic_registrations_;
-
-  /// Mapping of TopicId to the last version of the topic this subscriber successfully
-  /// processed.
-  typedef boost::unordered_map<Statestore::TopicId, int64_t> TopicVersionMap;
-  TopicVersionMap current_topic_versions_;
-
-  /// statestore client cache - only one client is ever used.
+  /// statestore client cache - only one client is ever used. Initialized in constructor.
   boost::scoped_ptr<StatestoreClientCache> client_cache_;
 
   /// MetricGroup instance that all metrics are registered in. Not owned by this class.
@@ -208,25 +161,82 @@ class StatestoreSubscriber {
   /// When the last recovery happened.
   StringProperty* last_recovery_time_metric_;
 
-  /// Accumulated statistics on the frequency of topic-update messages
+  /// Accumulated statistics on the frequency of topic-update messages, including samples
+  /// from all topics.
   StatsMetric<double>* topic_update_interval_metric_;
 
-  /// Tracks the time between topic-update mesages
-  MonotonicStopWatch topic_update_interval_timer_;
-
   /// Accumulated statistics on the time taken to process each topic-update message from
   /// the statestore (that is, to call all callbacks)
   StatsMetric<double>* topic_update_duration_metric_;
 
-  /// Tracks the time between heartbeat mesages
-  MonotonicStopWatch heartbeat_interval_timer_;
-
   /// Accumulated statistics on the frequency of heartbeat messages
   StatsMetric<double>* heartbeat_interval_metric_;
 
+  /// Tracks the time between heartbeat messages. Only updated by Heartbeat(), which
+  /// should not run concurrently with itself.
+  MonotonicStopWatch heartbeat_interval_timer_;
+
   /// Current registration ID, in string form.
   StringProperty* registration_id_metric_;
 
+  /// Object-wide lock that protects the below members. Must be held exclusively when
+  /// modifying the members, except when modifying TopicRegistrations - see
+  /// TopicRegistration::update_lock for details of locking there. Held in shared mode
+  /// when processing topic updates to prevent concurrent updates to other state. Most
+  /// private methods must be called holding this lock; this is noted in the method
+  /// comments.
+  boost::shared_mutex lock_;
+
+  /// Set to true after Register(...) is successful, after which no
+  /// more topics may be subscribed to.
+  bool is_registered_;
+
+  /// Protects registration_id_. Must be taken after lock_ if both are to be taken
+  /// together.
+  boost::mutex registration_id_lock_;
+
+  /// Set during Register(), this is the unique ID of the current registration with the
+  /// statestore. If this subscriber must recover, or disconnects and then reconnects, the
+  /// registration_id_ will change after Register() is called again. This allows the
+  /// subscriber to reject communication from the statestore that pertains to a previous
+  /// registration.
+  RegistrationId registration_id_;
+
+  struct TopicRegistration {
+    /// Held when processing a topic update. 'StatestoreSubscriber::lock_' must be held in
+    /// shared mode before acquiring this lock. If taking multiple update locks, they must
+    /// be acquired in ascending order of topic name.
+    boost::mutex update_lock;
+
+    /// Whether the subscriber considers this topic to be "transient", that is any updates
+    /// it makes will be deleted upon failure or disconnection.
+    bool is_transient = false;
+
+    /// The last version of the topic this subscriber processed.
+    /// -1 if no updates have been processed yet.
+    int64_t current_topic_version = -1;
+
+    /// Owned by the MetricGroup instance. Tracks how long callbacks took to process this
+    /// topic.
+    StatsMetric<double>* processing_time_metric = nullptr;
+
+    /// Tracks the time between topic-update messages to update 'update_interval_metric'.
+    MonotonicStopWatch update_interval_timer;
+
+    /// Owned by the MetricGroup instances. Tracks the time between the end of the last
+    /// update RPC for this topic and the start of the next.
+    StatsMetric<double>* update_interval_metric = nullptr;
+
+    /// Callback for all services that have registered for updates.
+    std::vector<UpdateCallback> callbacks;
+  };
+
+  /// One entry for every topic subscribed to. 'lock_' must be held exclusively to add or
+  /// remove entries from the map or held as a shared lock to lookup entries in the map.
+  /// Modifications to the contents of each TopicRegistration is protected by
+  /// TopicRegistration::update_lock.
+  boost::unordered_map<Statestore::TopicId, TopicRegistration> topic_registrations_;
+
   /// Subscriber thrift implementation, needs to access UpdateState
   friend class StatestoreSubscriberThriftIf;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 8f4ddbf..1072dee 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -17,6 +17,10 @@
 
 #include "statestore/statestore.h"
 
+#include <algorithm>
+#include <tuple>
+#include <utility>
+
 #include <boost/lexical_cast.hpp>
 #include <boost/thread.hpp>
 #include <thrift/Thrift.h>
@@ -36,6 +40,12 @@
 
 #include "common/names.h"
 
+using boost::shared_lock;
+using boost::shared_mutex;
+using boost::upgrade_lock;
+using boost::upgrade_to_unique_lock;
+using std::forward_as_tuple;
+using std::piecewise_construct;
 using namespace apache::thrift;
 using namespace impala;
 using namespace rapidjson;
@@ -50,6 +60,15 @@ DEFINE_int32(statestore_num_update_threads, 10, "(Advanced) Number of threads us
 DEFINE_int32(statestore_update_frequency_ms, 2000, "(Advanced) Frequency (in ms) with"
     " which the statestore sends topic updates to subscribers.");
 
+// Priority updates are sent out much more frequently. They are assumed to be small
+// amounts of data that take a small amount of time to process. Assuming each update
+// takes < 1ms to process, sending out an update every 100ms will consume less than
+// 1% of a CPU on each subscriber.
+DEFINE_int32(statestore_num_priority_update_threads, 10, "(Advanced) Number of threads "
+    "used to send prioritized topic updates in parallel to all registered subscribers.");
+DEFINE_int32(statestore_priority_update_frequency_ms, 100, "(Advanced) Frequency (in ms) "
+    "with which the statestore sends prioritized topic updates to subscribers.");
+
 DEFINE_int32(statestore_num_heartbeat_threads, 10, "(Advanced) Number of threads used to "
     " send heartbeats in parallel to all registered subscribers.");
 DEFINE_int32(statestore_heartbeat_frequency_ms, 1000, "(Advanced) Frequency (in ms) with"
@@ -87,18 +106,23 @@ const string STATESTORE_TOTAL_KEY_SIZE_BYTES = "statestore.total-key-size-bytes"
 const string STATESTORE_TOTAL_VALUE_SIZE_BYTES = "statestore.total-value-size-bytes";
 const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES = "statestore.total-topic-size-bytes";
 const string STATESTORE_UPDATE_DURATION = "statestore.topic-update-durations";
+const string STATESTORE_PRIORITY_UPDATE_DURATION =
+    "statestore.priority-topic-update-durations";
 const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
 
 // Initial version for each Topic registered by a Subscriber. Generally, the Topic will
 // have a Version that is the MAX() of all entries in the Topic, but this initial
 // value needs to be less than TopicEntry::TOPIC_ENTRY_INITIAL_VERSION to distinguish
 // between the case where a Topic is empty and the case where the Topic only contains
-// an item with the initial version.
+// an entry with the initial version.
 const Statestore::TopicEntry::Version Statestore::Subscriber::TOPIC_INITIAL_VERSION = 0;
 
 // Updates or heartbeats that miss their deadline by this much are logged.
 const uint32_t DEADLINE_MISS_THRESHOLD_MS = 2000;
 
+const string Statestore::IMPALA_MEMBERSHIP_TOPIC("impala-membership");
+const string Statestore::IMPALA_REQUEST_QUEUE_TOPIC("impala-request-queue");
+
 typedef ClientConnection<StatestoreSubscriberClientWrapper> StatestoreSubscriberConn;
 
 class StatestoreThriftIf : public StatestoreServiceIf {
@@ -127,46 +151,55 @@ void Statestore::TopicEntry::SetValue(const Statestore::TopicEntry::Value& bytes
   version_ = version;
 }
 
-Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
-    const Statestore::TopicEntry::Value& bytes, bool is_deleted) {
-  TopicEntryMap::iterator entry_it = entries_.find(key);
-  int64_t key_size_delta = 0;
-  int64_t value_size_delta = 0;
-  if (entry_it == entries_.end()) {
-    entry_it = entries_.insert(make_pair(key, TopicEntry())).first;
-    key_size_delta += key.size();
-  } else {
-    // Delete the old item from the version history. There is no need to search the
-    // version_history because there should only be at most a single item in the history
-    // at any given time
-    topic_update_log_.erase(entry_it->second.version());
-    value_size_delta -= entry_it->second.value().size();
+vector<Statestore::TopicEntry::Version> Statestore::Topic::Put(
+    const std::vector<TTopicItem>& entries) {
+  vector<Statestore::TopicEntry::Version> versions;
+  versions.reserve(entries.size());
+
+  // Acquire exclusive lock - we are modifying the topic.
+  lock_guard<shared_mutex> write_lock(lock_);
+  for (const TTopicItem& entry: entries) {
+    TopicEntryMap::iterator entry_it = entries_.find(entry.key);
+    int64_t key_size_delta = 0;
+    int64_t value_size_delta = 0;
+    if (entry_it == entries_.end()) {
+      entry_it = entries_.emplace(entry.key, TopicEntry()).first;
+      key_size_delta += entry.key.size();
+    } else {
+      // Delete the old entry from the version history. There is no need to search the
+      // version_history because there should only be at most a single entry in the
+      // history at any given time.
+      topic_update_log_.erase(entry_it->second.version());
+      value_size_delta -= entry_it->second.value().size();
+    }
+    value_size_delta += entry.value.size();
+
+    entry_it->second.SetValue(entry.value, ++last_version_);
+    entry_it->second.SetDeleted(entry.deleted);
+    topic_update_log_.emplace(entry_it->second.version(), entry.key);
+
+    total_key_size_bytes_ += key_size_delta;
+    total_value_size_bytes_ += value_size_delta;
+    DCHECK_GE(total_key_size_bytes_, static_cast<int64_t>(0));
+    DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
+    key_size_metric_->Increment(key_size_delta);
+    value_size_metric_->Increment(value_size_delta);
+    topic_size_metric_->Increment(key_size_delta + value_size_delta);
+    versions.push_back(entry_it->second.version());
   }
-  value_size_delta += bytes.size();
-
-  entry_it->second.SetValue(bytes, ++last_version_);
-  entry_it->second.SetDeleted(is_deleted);
-  topic_update_log_.insert(make_pair(entry_it->second.version(), key));
-
-  total_key_size_bytes_ += key_size_delta;
-  total_value_size_bytes_ += value_size_delta;
-  DCHECK_GE(total_key_size_bytes_, static_cast<int64_t>(0));
-  DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
-  key_size_metric_->Increment(key_size_delta);
-  value_size_metric_->Increment(value_size_delta);
-  topic_size_metric_->Increment(key_size_delta + value_size_delta);
-
-  return entry_it->second.version();
+  return versions;
 }
 
 void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
     const Statestore::TopicEntryKey& key) {
+  // Acquire exclusive lock - we are modifying the topic.
+  lock_guard<shared_mutex> write_lock(lock_);
   TopicEntryMap::iterator entry_it = entries_.find(key);
   if (entry_it != entries_.end() && entry_it->second.version() == version) {
     // Add a new entry with the the version history for this deletion and remove the old
     // entry
     topic_update_log_.erase(version);
-    topic_update_log_.insert(make_pair(++last_version_, key));
+    topic_update_log_.emplace(++last_version_, key);
     value_size_metric_->Increment(entry_it->second.value().size());
     topic_size_metric_->Increment(entry_it->second.value().size());
     entry_it->second.SetDeleted(true);
@@ -174,6 +207,79 @@ void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
   }
 }
 
+void Statestore::Topic::BuildDelta(const SubscriberId& subscriber_id,
+    TopicEntry::Version last_processed_version, TTopicDelta* delta) {
+  // If the subscriber version is > 0, send this update as a delta. Otherwise, this is
+  // a new subscriber so send them a non-delta update that includes all entries in the
+  // topic.
+  delta->is_delta = last_processed_version > Subscriber::TOPIC_INITIAL_VERSION;
+  delta->__set_from_version(last_processed_version);
+  {
+    // Acquire shared lock - we are not modifying the topic.
+    shared_lock<shared_mutex> read_lock(lock_);
+    TopicUpdateLog::const_iterator next_update =
+        topic_update_log_.upper_bound(last_processed_version);
+
+    uint64_t topic_size = 0;
+    for (; next_update != topic_update_log_.end(); ++next_update) {
+      TopicEntryMap::const_iterator itr = entries_.find(next_update->second);
+      DCHECK(itr != entries_.end());
+      const TopicEntry& topic_entry = itr->second;
+      // Don't send deleted entries for non-delta updates.
+      if (!delta->is_delta && topic_entry.is_deleted()) {
+        continue;
+      }
+      delta->topic_entries.push_back(TTopicItem());
+      TTopicItem& delta_entry = delta->topic_entries.back();
+      delta_entry.key = itr->first;
+      delta_entry.value = topic_entry.value();
+      delta_entry.deleted = topic_entry.is_deleted();
+      topic_size += delta_entry.key.size() + delta_entry.value.size();
+    }
+
+    if (!delta->is_delta &&
+        last_version_ > Subscriber::TOPIC_INITIAL_VERSION) {
+      VLOG_QUERY << "Preparing initial " << delta->topic_name
+                 << " topic update for " << subscriber_id << ". Size = "
+                 << PrettyPrinter::Print(topic_size, TUnit::BYTES);
+    }
+
+    if (topic_update_log_.size() > 0) {
+      // The largest version for this topic will be the last entry in the version history
+      // map.
+      delta->__set_to_version(topic_update_log_.rbegin()->first);
+    } else {
+      // There are no updates in the version history
+      delta->__set_to_version(Subscriber::TOPIC_INITIAL_VERSION);
+    }
+  }
+}
+void Statestore::Topic::ToJson(Document* document, Value* topic_json) {
+  // Acquire shared lock - we are not modifying the topic.
+  shared_lock<shared_mutex> read_lock(lock_);
+  Value topic_id(topic_id_.c_str(), document->GetAllocator());
+  topic_json->AddMember("topic_id", topic_id, document->GetAllocator());
+  topic_json->AddMember("num_entries",
+      static_cast<uint64_t>(entries_.size()),
+      document->GetAllocator());
+  topic_json->AddMember("version", last_version_, document->GetAllocator());
+
+  int64_t key_size = total_key_size_bytes_;
+  int64_t value_size = total_value_size_bytes_;
+  Value key_size_json(PrettyPrinter::Print(key_size, TUnit::BYTES).c_str(),
+      document->GetAllocator());
+  topic_json->AddMember("key_size", key_size_json, document->GetAllocator());
+  Value value_size_json(PrettyPrinter::Print(value_size, TUnit::BYTES).c_str(),
+      document->GetAllocator());
+  topic_json->AddMember("value_size", value_size_json, document->GetAllocator());
+  Value total_size_json(
+      PrettyPrinter::Print(key_size + value_size, TUnit::BYTES).c_str(),
+      document->GetAllocator());
+  topic_json->AddMember("total_size", total_size_json, document->GetAllocator());
+  topic_json->AddMember("prioritized", IsPrioritizedTopic(topic_id_),
+      document->GetAllocator());
+}
+
 Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
     const RegistrationId& registration_id, const TNetworkAddress& network_address,
     const vector<TTopicRegistration>& subscribed_topics)
@@ -181,47 +287,94 @@ Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
       registration_id_(registration_id),
       network_address_(network_address) {
   for (const TTopicRegistration& topic: subscribed_topics) {
-    TopicState topic_state;
-    topic_state.is_transient = topic.is_transient;
-    topic_state.last_version = TOPIC_INITIAL_VERSION;
-    subscribed_topics_[topic.topic_name] = topic_state;
+    GetTopicsMapForId(topic.topic_name)->emplace(piecewise_construct,
+        forward_as_tuple(topic.topic_name), forward_as_tuple(topic.is_transient));
   }
 }
 
-void Statestore::Subscriber::AddTransientUpdate(const TopicId& topic_id,
-    const TopicEntryKey& topic_key, TopicEntry::Version version) {
+bool Statestore::Subscriber::AddTransientEntries(const TopicId& topic_id,
+    const vector<TTopicItem>& entries,
+    const vector<TopicEntry::Version>& entry_versions) {
+  lock_guard<mutex> l(transient_entry_lock_);
+  DCHECK_EQ(entries.size(), entry_versions.size());
   // Only record the update if the topic is transient
-  const Topics::const_iterator topic_it = subscribed_topics_.find(topic_id);
-  DCHECK(topic_it != subscribed_topics_.end());
-  if (topic_it->second.is_transient == true) {
-    transient_entries_[make_pair(topic_id, topic_key)] = version;
+  Topics* subscribed_topics = GetTopicsMapForId(topic_id);
+  Topics::iterator topic_it = subscribed_topics->find(topic_id);
+  DCHECK(topic_it != subscribed_topics->end());
+  if (topic_it->second.is_transient) {
+    if (unregistered_) return false;
+    for (int i = 0; i < entries.size(); ++i) {
+      topic_it->second.transient_entries_[entries[i].key] = entry_versions[i];
+    }
   }
+  return true;
+}
+
+void Statestore::Subscriber::DeleteAllTransientEntries(TopicMap* global_topics) {
+  lock_guard<mutex> l(transient_entry_lock_);
+  for (const Topics* subscribed_topics :
+      {&priority_subscribed_topics_, &non_priority_subscribed_topics_}) {
+    for (const auto& topic : *subscribed_topics) {
+      auto global_topic_it = global_topics->find(topic.first);
+      DCHECK(global_topic_it != global_topics->end());
+      for (auto& transient_entry : topic.second.transient_entries_) {
+        global_topic_it->second.DeleteIfVersionsMatch(transient_entry.second,
+            transient_entry.first);
+      }
+    }
+  }
+  unregistered_ = true;
+}
+
+int64_t Statestore::Subscriber::NumTransientEntries() {
+  lock_guard<mutex> l(transient_entry_lock_);
+  int64_t num_entries = 0;
+  for (const Topics* subscribed_topics :
+      {&priority_subscribed_topics_, &non_priority_subscribed_topics_}) {
+    for (const auto& topic : *subscribed_topics) {
+      num_entries += topic.second.transient_entries_.size();
+    }
+  }
+  return num_entries;
 }
 
 Statestore::TopicEntry::Version Statestore::Subscriber::LastTopicVersionProcessed(
     const TopicId& topic_id) const {
-  Topics::const_iterator itr = subscribed_topics_.find(topic_id);
-  return itr == subscribed_topics_.end() ?
-      TOPIC_INITIAL_VERSION : itr->second.last_version;
+  const Topics& subscribed_topics = GetTopicsMapForId(topic_id);
+  Topics::const_iterator itr = subscribed_topics.find(topic_id);
+  return itr == subscribed_topics.end() ? TOPIC_INITIAL_VERSION
+                                        : itr->second.last_version.Load();
 }
 
 void Statestore::Subscriber::SetLastTopicVersionProcessed(const TopicId& topic_id,
     TopicEntry::Version version) {
-  subscribed_topics_[topic_id].last_version = version;
+  // Safe to call concurrently for different topics because 'subscribed_topics' is not
+  // modified.
+  Topics* subscribed_topics = GetTopicsMapForId(topic_id);
+  Topics::iterator topic_it = subscribed_topics->find(topic_id);
+  DCHECK(topic_it != subscribed_topics->end());
+  topic_it->second.last_version.Store(version);
 }
 
 Statestore::Statestore(MetricGroup* metrics)
-  : exit_flag_(false),
-    subscriber_topic_update_threadpool_("statestore-update",
+  : subscriber_topic_update_threadpool_("statestore-update",
         "subscriber-update-worker",
         FLAGS_statestore_num_update_threads,
         FLAGS_statestore_max_subscribers,
-        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, false, _1, _2)),
+        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this,
+          UpdateKind::TOPIC_UPDATE, _1, _2)),
+    subscriber_priority_topic_update_threadpool_("statestore-priority-update",
+        "subscriber-priority-update-worker",
+        FLAGS_statestore_num_priority_update_threads,
+        FLAGS_statestore_max_subscribers,
+        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this,
+          UpdateKind::PRIORITY_TOPIC_UPDATE, _1, _2)),
     subscriber_heartbeat_threadpool_("statestore-heartbeat",
         "subscriber-heartbeat-worker",
         FLAGS_statestore_num_heartbeat_threads,
         FLAGS_statestore_max_subscribers,
-        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, true, _1, _2)),
+        bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this,
+          UpdateKind::HEARTBEAT, _1, _2)),
     update_state_client_cache_(new StatestoreSubscriberClientCache(1, 0,
         FLAGS_statestore_update_tcp_timeout_seconds * 1000,
         FLAGS_statestore_update_tcp_timeout_seconds * 1000, "",
@@ -245,6 +398,8 @@ Statestore::Statestore(MetricGroup* metrics)
 
   topic_update_duration_metric_ =
       StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_UPDATE_DURATION);
+  priority_topic_update_duration_metric_ = StatsMetric<double>::CreateAndRegister(
+      metrics, STATESTORE_PRIORITY_UPDATE_DURATION);
   heartbeat_duration_metric_ =
       StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_HEARTBEAT_DURATION);
 
@@ -254,6 +409,7 @@ Statestore::Statestore(MetricGroup* metrics)
 
 Status Statestore::Init() {
   RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init());
+  RETURN_IF_ERROR(subscriber_priority_topic_update_threadpool_.Init());
   RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init());
   return Status::OK();
 }
@@ -275,43 +431,21 @@ void Statestore::RegisterWebpages(Webserver* webserver) {
 void Statestore::TopicsHandler(const Webserver::ArgumentMap& args,
     Document* document) {
   lock_guard<mutex> l(subscribers_lock_);
-  lock_guard<mutex> t(topic_lock_);
+  shared_lock<shared_mutex> t(topics_map_lock_);
 
   Value topics(kArrayType);
 
-  for (const TopicMap::value_type& topic: topics_) {
+  for (TopicMap::value_type& topic: topics_) {
     Value topic_json(kObjectType);
-
-    Value topic_id(topic.second.id().c_str(), document->GetAllocator());
-    topic_json.AddMember("topic_id", topic_id, document->GetAllocator());
-    topic_json.AddMember("num_entries",
-        static_cast<uint64_t>(topic.second.entries().size()),
-        document->GetAllocator());
-    topic_json.AddMember(
-        "version", topic.second.last_version(), document->GetAllocator());
-
+    topic.second.ToJson(document, &topic_json);
     SubscriberId oldest_subscriber_id;
     TopicEntry::Version oldest_subscriber_version =
         GetMinSubscriberTopicVersion(topic.first, &oldest_subscriber_id);
-
     topic_json.AddMember("oldest_version", oldest_subscriber_version,
         document->GetAllocator());
     Value oldest_id(oldest_subscriber_id.c_str(), document->GetAllocator());
     topic_json.AddMember("oldest_id", oldest_id, document->GetAllocator());
-
-    int64_t key_size = topic.second.total_key_size_bytes();
-    int64_t value_size = topic.second.total_value_size_bytes();
-    Value key_size_json(PrettyPrinter::Print(key_size, TUnit::BYTES).c_str(),
-        document->GetAllocator());
-    topic_json.AddMember("key_size", key_size_json, document->GetAllocator());
-    Value value_size_json(PrettyPrinter::Print(value_size, TUnit::BYTES).c_str(),
-        document->GetAllocator());
-    topic_json.AddMember("value_size", value_size_json, document->GetAllocator());
-    Value total_size_json(
-        PrettyPrinter::Print(key_size + value_size, TUnit::BYTES).c_str(),
-        document->GetAllocator());
-    topic_json.AddMember("total_size", total_size_json, document->GetAllocator());
-    topics.PushBack(topic_json, document->GetAllocator());
+        topics.PushBack(topic_json, document->GetAllocator());
   }
   document->AddMember("topics", topics, document->GetAllocator());
 }
@@ -330,11 +464,18 @@ void Statestore::SubscribersHandler(const Webserver::ArgumentMap& args,
         document->GetAllocator());
     sub_json.AddMember("address", address, document->GetAllocator());
 
+    int64_t num_priority_topics =
+        subscriber.second->priority_subscribed_topics().size();
+    int64_t num_non_priority_topics =
+        subscriber.second->non_priority_subscribed_topics().size();
     sub_json.AddMember("num_topics",
-        static_cast<uint64_t>(subscriber.second->subscribed_topics().size()),
+        static_cast<uint64_t>(num_priority_topics + num_non_priority_topics),
+        document->GetAllocator());
+    sub_json.AddMember("num_priority_topics",
+        static_cast<uint64_t>(num_priority_topics),
         document->GetAllocator());
     sub_json.AddMember("num_transient",
-        static_cast<uint64_t>(subscriber.second->transient_entries().size()),
+        static_cast<uint64_t>(subscriber.second->NumTransientEntries()),
         document->GetAllocator());
 
     Value registration_id(PrintId(subscriber.second->registration_id()).c_str(),
@@ -378,14 +519,20 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
   // Create any new topics first, so that when the subscriber is first sent a topic update
   // by the worker threads its topics are guaranteed to exist.
   {
-    lock_guard<mutex> l(topic_lock_);
+    // Start with a shared read lock when checking the map. In the common case the topic
+    // will already exist, so we don't need to immediately get the exclusive lock and
+    // block other threads.
+    upgrade_lock<shared_mutex> topic_read_lock(topics_map_lock_);
     for (const TTopicRegistration& topic: topic_registrations) {
       TopicMap::iterator topic_it = topics_.find(topic.topic_name);
       if (topic_it == topics_.end()) {
+        // Upgrade to an exclusive lock when modifying the map.
+        upgrade_to_unique_lock<shared_mutex> topic_write_lock(topic_read_lock);
         LOG(INFO) << "Creating new topic: ''" << topic.topic_name
                   << "' on behalf of subscriber: '" << subscriber_id;
-        topics_.insert(make_pair(topic.topic_name, Topic(topic.topic_name,
-            key_size_metric_, value_size_metric_, topic_size_metric_)));
+        topics_.emplace(piecewise_construct, forward_as_tuple(topic.topic_name),
+            forward_as_tuple(topic.topic_name, key_size_metric_, value_size_metric_,
+            topic_size_metric_));
       }
     }
   }
@@ -400,7 +547,7 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
     UUIDToTUniqueId(subscriber_uuid_generator_(), registration_id);
     shared_ptr<Subscriber> current_registration(
         new Subscriber(subscriber_id, *registration_id, location, topic_registrations));
-    subscribers_.insert(make_pair(subscriber_id, current_registration));
+    subscribers_.emplace(subscriber_id, current_registration);
     failure_detector_->UpdateHeartbeat(
         PrintId(current_registration->registration_id()), true);
     num_subscribers_metric_->SetValue(subscribers_.size());
@@ -409,6 +556,7 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
     // Add the subscriber to the update queue, with an immediate schedule.
     ScheduledSubscriberUpdate update(0, subscriber_id, *registration_id);
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_));
+    RETURN_IF_ERROR(OfferUpdate(update, &subscriber_priority_topic_update_threadpool_));
     RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_));
   }
 
@@ -428,7 +576,8 @@ bool Statestore::FindSubscriber(const SubscriberId& subscriber_id,
   return true;
 }
 
-Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped) {
+Status Statestore::SendTopicUpdate(Subscriber* subscriber, UpdateKind update_kind,
+    bool* update_skipped) {
   // Time any successful RPCs (i.e. those for which UpdateState() completed, even though
   // it may have returned an error.)
   MonotonicStopWatch sw;
@@ -436,7 +585,12 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
   // First thing: make a list of updates to send
   TUpdateStateRequest update_state_request;
-  GatherTopicUpdates(*subscriber, &update_state_request);
+  GatherTopicUpdates(*subscriber, update_kind, &update_state_request);
+  // 'subscriber' may not be subscribed to any updates of 'update_kind'.
+  if (update_state_request.topic_deltas.empty()) {
+    *update_skipped = false;
+    return Status::OK();
+  }
 
   // Set the expected registration ID, so that the subscriber can reject this update if
   // they have moved on to a new registration instance.
@@ -452,9 +606,12 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
   RETURN_IF_ERROR(client.DoRpc(
       &StatestoreSubscriberClientWrapper::UpdateState, update_state_request, &response));
 
+  StatsMetric<double>* update_duration_metric =
+      update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE ?
+      priority_topic_update_duration_metric_ : topic_update_duration_metric_;
   status = Status(response.status);
   if (!status.ok()) {
-    topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
+    update_duration_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
     return status;
   }
 
@@ -463,7 +620,7 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
     // The subscriber skipped processing this update. We don't consider this a failure
     // - subscribers can decide what they do with any update - so, return OK and set
     // update_skipped so the caller can compensate.
-    topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
+    update_duration_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
     return Status::OK();
   }
 
@@ -476,7 +633,7 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
 
   // Thirdly: perform any / all updates returned by the subscriber
   {
-    lock_guard<mutex> l(topic_lock_);
+    shared_lock<shared_mutex> l(topics_map_lock_);
     for (const TTopicDelta& update: response.topic_updates) {
       TopicMap::iterator topic_it = topics_.find(update.topic_name);
       if (topic_it == topics_.end()) {
@@ -498,80 +655,49 @@ Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped)
         subscriber->SetLastTopicVersionProcessed(topic_it->first, update.from_version);
       }
 
-      Topic* topic = &topic_it->second;
-      for (const TTopicItem& item: update.topic_entries) {
-        subscriber->AddTransientUpdate(update.topic_name, item.key,
-            topic->Put(item.key, item.value, item.deleted));
+      Topic& topic = topic_it->second;
+      // Update the topic and add transient entries separately to avoid holding both
+      // locks at the same time and preventing concurrent topic updates.
+      vector<TopicEntry::Version> entry_versions = topic.Put(update.topic_entries);
+      if (!subscriber->AddTransientEntries(
+          update.topic_name, update.topic_entries, entry_versions)) {
+        // Subscriber was unregistered - clean up the transient entries.
+        for (int i = 0; i < update.topic_entries.size(); ++i) {
+          topic.DeleteIfVersionsMatch(entry_versions[i], update.topic_entries[i].key);
+        }
       }
     }
   }
-  topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
+  update_duration_metric->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
   return Status::OK();
 }
 
 void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
-    TUpdateStateRequest* update_state_request) {
+    UpdateKind update_kind, TUpdateStateRequest* update_state_request) {
   {
-    lock_guard<mutex> l(topic_lock_);
-    for (const Subscriber::Topics::value_type& subscribed_topic:
-         subscriber.subscribed_topics()) {
-      TopicMap::const_iterator topic_it = topics_.find(subscribed_topic.first);
+    DCHECK(update_kind == UpdateKind::TOPIC_UPDATE
+        || update_kind ==  UpdateKind::PRIORITY_TOPIC_UPDATE)
+        << static_cast<int>(update_kind);
+    const bool is_priority = update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE;
+    const Subscriber::Topics& subscribed_topics = is_priority
+        ? subscriber.priority_subscribed_topics()
+        : subscriber.non_priority_subscribed_topics();
+    shared_lock<shared_mutex> l(topics_map_lock_);
+    for (const auto& subscribed_topic: subscribed_topics) {
+      auto topic_it = topics_.find(subscribed_topic.first);
       DCHECK(topic_it != topics_.end());
-
       TopicEntry::Version last_processed_version =
           subscriber.LastTopicVersionProcessed(topic_it->first);
-      const Topic& topic = topic_it->second;
 
       TTopicDelta& topic_delta =
           update_state_request->topic_deltas[subscribed_topic.first];
       topic_delta.topic_name = subscribed_topic.first;
-
-      // If the subscriber version is > 0, send this update as a delta. Otherwise, this is
-      // a new subscriber so send them a non-delta update that includes all items in the
-      // topic.
-      topic_delta.is_delta = last_processed_version > Subscriber::TOPIC_INITIAL_VERSION;
-      topic_delta.__set_from_version(last_processed_version);
-
-      TopicUpdateLog::const_iterator next_update =
-          topic.topic_update_log().upper_bound(last_processed_version);
-
-      uint64_t topic_size = 0;
-      for (; next_update != topic.topic_update_log().end(); ++next_update) {
-        TopicEntryMap::const_iterator itr = topic.entries().find(next_update->second);
-        DCHECK(itr != topic.entries().end());
-        const TopicEntry& topic_entry = itr->second;
-        // Don't send deleted entries for non-delta updates.
-        if (!topic_delta.is_delta && topic_entry.is_deleted()) {
-          continue;
-        }
-        topic_delta.topic_entries.push_back(TTopicItem());
-        TTopicItem& topic_item = topic_delta.topic_entries.back();
-        topic_item.key = itr->first;
-        topic_item.value = topic_entry.value();
-        topic_item.deleted = topic_entry.is_deleted();
-        topic_size += topic_item.key.size() + topic_item.value.size();
-      }
-
-      if (!topic_delta.is_delta &&
-          topic.last_version() > Subscriber::TOPIC_INITIAL_VERSION) {
-        VLOG_QUERY << "Preparing initial " << topic_delta.topic_name
-                   << " topic update for " << subscriber.id() << ". Size = "
-                   << PrettyPrinter::Print(topic_size, TUnit::BYTES);
-      }
-
-      if (topic.topic_update_log().size() > 0) {
-        // The largest version for this topic will be the last item in the version history
-        // map.
-        topic_delta.__set_to_version(topic.topic_update_log().rbegin()->first);
-      } else {
-        // There are no updates in the version history
-        topic_delta.__set_to_version(Subscriber::TOPIC_INITIAL_VERSION);
-      }
+      topic_it->second.BuildDelta(subscriber.id(), last_processed_version, &topic_delta);
     }
   }
 
   // Fill in the min subscriber topic version. This must be done after releasing
-  // topic_lock_.
+  // topics_map_lock_.
   lock_guard<mutex> l(subscribers_lock_);
   typedef map<TopicId, TTopicDelta> TopicDeltaMap;
   for (TopicDeltaMap::value_type& topic_delta: update_state_request->topic_deltas) {
@@ -586,8 +712,8 @@ Statestore::TopicEntry::Version Statestore::GetMinSubscriberTopicVersion(
   bool found = false;
   // Find the minimum version processed for this topic across all topic subscribers.
   for (const SubscriberMap::value_type& subscriber: subscribers_) {
-    if (subscriber.second->subscribed_topics().find(topic_id) !=
-        subscriber.second->subscribed_topics().end()) {
+    auto subscribed_topics = subscriber.second->GetTopicsMapForId(topic_id);
+    if (subscribed_topics->find(topic_id) != subscribed_topics->end()) {
       found = true;
       TopicEntry::Version last_processed_version =
           subscriber.second->LastTopicVersionProcessed(topic_id);
@@ -600,15 +726,33 @@ Statestore::TopicEntry::Version Statestore::GetMinSubscriberTopicVersion(
   return found ? min_topic_version : Subscriber::TOPIC_INITIAL_VERSION;
 }
 
-bool Statestore::ShouldExit() {
-  lock_guard<mutex> l(exit_flag_lock_);
-  return exit_flag_;
+bool Statestore::IsPrioritizedTopic(const string& topic) {
+  return topic == IMPALA_MEMBERSHIP_TOPIC || topic == IMPALA_REQUEST_QUEUE_TOPIC;
 }
 
-void Statestore::SetExitFlag() {
-  lock_guard<mutex> l(exit_flag_lock_);
-  exit_flag_ = true;
-  subscriber_topic_update_threadpool_.Shutdown();
+const char* Statestore::GetUpdateKindName(UpdateKind kind) {
+  switch (kind) {
+    case UpdateKind::TOPIC_UPDATE:
+      return "topic update";
+    case UpdateKind::PRIORITY_TOPIC_UPDATE:
+      return "priority topic update";
+    case UpdateKind::HEARTBEAT:
+      return "heartbeat";
+  }
+  DCHECK(false);
+}
+
+ThreadPool<Statestore::ScheduledSubscriberUpdate>* Statestore::GetThreadPool(
+    UpdateKind kind) {
+  switch (kind) {
+    case UpdateKind::TOPIC_UPDATE:
+      return &subscriber_topic_update_threadpool_;
+    case UpdateKind::PRIORITY_TOPIC_UPDATE:
+      return &subscriber_priority_topic_update_threadpool_;
+    case UpdateKind::HEARTBEAT:
+      return &subscriber_heartbeat_threadpool_;
+  }
+  DCHECK(false);
 }
 
 Status Statestore::SendHeartbeat(Subscriber* subscriber) {
@@ -630,8 +774,9 @@ Status Statestore::SendHeartbeat(Subscriber* subscriber) {
   return Status::OK();
 }
 
-void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
+void Statestore::DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
     const ScheduledSubscriberUpdate& update) {
+  const bool is_heartbeat = update_kind == UpdateKind::HEARTBEAT;
   int64_t update_deadline = update.deadline;
   shared_ptr<Subscriber> subscriber;
   // Check if the subscriber has re-registered, in which case we can ignore
@@ -639,7 +784,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
   if (!FindSubscriber(update.subscriber_id, update.registration_id, &subscriber)) {
     return;
   }
-  const string hb_type = is_heartbeat ? "heartbeat" : "topic update";
+  const char* update_kind_str = GetUpdateKindName(update_kind);
   if (update_deadline != 0) {
     // Wait until deadline.
     int64_t diff_ms = update_deadline - UnixMillis();
@@ -654,7 +799,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
       return;
     }
     diff_ms = std::abs(diff_ms);
-    VLOG(3) << "Sending " << hb_type << " message to: " << update.subscriber_id
+    VLOG(3) << "Sending " << update_kind_str << " message to: " << update.subscriber_id
         << " (deadline accuracy: " << diff_ms << "ms)";
 
     if (diff_ms > DEADLINE_MISS_THRESHOLD_MS && is_heartbeat) {
@@ -663,7 +808,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
           "consider increasing --statestore_heartbeat_frequency_ms (currently $3) on "
           "this Statestore, and --statestore_subscriber_timeout_seconds "
           "on subscribers (Impala daemons and the Catalog Server)",
-          update.subscriber_id, hb_type, diff_ms,
+          update.subscriber_id, update_kind_str, diff_ms,
           FLAGS_statestore_heartbeat_frequency_ms);
       LOG(WARNING) << msg;
     }
@@ -674,7 +819,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
   } else {
     // The first update is scheduled immediately and has a deadline of 0. There's no need
     // to wait.
-    VLOG(3) << "Initial " << hb_type << " message for: " << update.subscriber_id;
+    VLOG(3) << "Initial " << update_kind_str << " message for: " << update.subscriber_id;
   }
 
   // Send the right message type, and compute the next deadline
@@ -692,18 +837,21 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
     deadline_ms = UnixMillis() + FLAGS_statestore_heartbeat_frequency_ms;
   } else {
     bool update_skipped;
-    status = SendTopicUpdate(subscriber.get(), &update_skipped);
+    status = SendTopicUpdate(subscriber.get(), update_kind, &update_skipped);
     if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
       // Add details to status to make it more useful, while preserving the stack
       status.AddDetail(Substitute(
           "Subscriber $0 timed-out during topic-update RPC. Timeout is $1s.",
           subscriber->id(), FLAGS_statestore_update_tcp_timeout_seconds));
     }
-    // If the subscriber responded that it skipped the last update sent, we assume that
-    // it was busy doing something else, and back off slightly before sending another.
-    int64_t update_interval = update_skipped ?
-        (2 * FLAGS_statestore_update_frequency_ms) :
-        FLAGS_statestore_update_frequency_ms;
+    // If the subscriber responded that it skipped a topic in the last update sent,
+    // we assume that it was busy doing something else, and back off slightly before
+    // sending another.
+    int64_t update_frequency = update_kind == UpdateKind::PRIORITY_TOPIC_UPDATE
+        ? FLAGS_statestore_priority_update_frequency_ms
+        : FLAGS_statestore_update_frequency_ms;
+    int64_t update_interval = update_skipped ? (2 * update_frequency)
+                                                 : update_frequency;
     deadline_ms = UnixMillis() + update_interval;
   }
 
@@ -715,7 +863,7 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
     if (it == subscribers_.end() ||
         it->second->registration_id() != update.registration_id) return;
     if (!status.ok()) {
-      LOG(INFO) << "Unable to send " << hb_type << " message to subscriber "
+      LOG(INFO) << "Unable to send " << update_kind_str << " message to subscriber "
                 << update.subscriber_id << ", received error: " << status.GetDetail();
     }
 
@@ -736,10 +884,9 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
       VLOG(3) << "Next " << (is_heartbeat ? "heartbeat" : "update") << " deadline for: "
               << subscriber->id() << " is in " << deadline_ms << "ms";
       status = OfferUpdate(ScheduledSubscriberUpdate(deadline_ms, subscriber->id(),
-          subscriber->registration_id()), is_heartbeat ?
-          &subscriber_heartbeat_threadpool_ : &subscriber_topic_update_threadpool_);
+          subscriber->registration_id()), GetThreadPool(update_kind));
       if (!status.ok()) {
-        LOG(INFO) << "Unable to send next " << (is_heartbeat ? "heartbeat" : "update")
+        LOG(INFO) << "Unable to send next " << update_kind_str
                   << " message to subscriber '" << subscriber->id() << "': "
                   << status.GetDetail();
       }
@@ -763,14 +910,11 @@ void Statestore::UnregisterSubscriber(Subscriber* subscriber) {
   failure_detector_->EvictPeer(PrintId(subscriber->registration_id()));
 
   // Delete all transient entries
-  lock_guard<mutex> topic_lock(topic_lock_);
-  for (Statestore::Subscriber::TransientEntryMap::value_type entry:
-       subscriber->transient_entries()) {
-    Statestore::TopicMap::iterator topic_it = topics_.find(entry.first.first);
-    DCHECK(topic_it != topics_.end());
-    topic_it->second.DeleteIfVersionsMatch(entry.second, // version
-        entry.first.second); // key
+  {
+    shared_lock<shared_mutex> topic_lock(topics_map_lock_);
+    subscriber->DeleteAllTransientEntries(&topics_);
   }
+
   num_subscribers_metric_->Increment(-1L);
   subscriber_set_metric_->Remove(subscriber->id());
   subscribers_.erase(subscriber->id());
@@ -778,4 +922,6 @@ void Statestore::UnregisterSubscriber(Subscriber* subscriber) {
 
 void Statestore::MainLoop() {
   subscriber_topic_update_threadpool_.Join();
+  subscriber_priority_topic_update_threadpool_.Join();
+  subscriber_heartbeat_threadpool_.Join();
 }


[5/6] impala git commit: IMPALA-4953, IMPALA-6437: separate AC/scheduler from catalog topic updates

Posted by ta...@apache.org.
http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index deeb5aa..3058e94 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -20,13 +20,16 @@
 
 #include <cstdint>
 #include <map>
+#include <memory>
 #include <string>
 #include <vector>
 
+#include <boost/thread/shared_mutex.hpp>
 #include <boost/scoped_ptr.hpp>
 #include <boost/unordered_map.hpp>
 #include <boost/uuid/uuid_generators.hpp>
 
+#include "common/atomic.h"
 #include "common/status.h"
 #include "gen-cpp/StatestoreService.h"
 #include "gen-cpp/StatestoreSubscriber.h"
@@ -51,38 +54,60 @@ typedef TUniqueId RegistrationId;
 
 /// The Statestore is a soft-state key-value store that maintains a set of Topics, which
 /// are maps from string keys to byte array values.
-//
+///
 /// Topics are subscribed to by subscribers, which are remote clients of the statestore
 /// which express an interest in some set of Topics. The statestore sends topic updates to
 /// subscribers via periodic 'update' messages, and also sends periodic 'heartbeat'
-/// messages, which are used to detect the liveness of a subscriber.
-//
+/// messages, which are used to detect the liveness of a subscriber. Updates for each
+/// topic are delivered sequentially to each subscriber per subscription. E.g. if a
+/// subscriber is subscribed to a topic "foo", the statestore will not deliver topic
+/// updates for "foo" out-of-order or concurrently, but the updates may be sent
+/// concurrently or out-of-order with "bar".
+///
 /// In response to 'update' messages, subscribers, send topic updates to the statestore to
 /// merge with the current topic. These updates are then sent to all other subscribers in
-/// their next update message. The next message is scheduled for
-/// FLAGS_statestore_update_frequency_ms in the future, unless the subscriber indicated
-/// that it skipped processing an update, in which case the statestore will back off
-/// slightly before re-sending the same update.
-//
+/// their next update message. The next message is scheduled update_frequency in the
+/// future, unless the subscriber indicated that it skipped processing an update, in which
+/// case the statestore will back off slightly before re-sending a new update. The
+/// update frequency is determined by FLAGS_statestore_update_frequency_ms or
+/// FLAGS_statestore_priority_update_frequency, depending on whether the topic is a
+/// prioritized topic.
+///
+/// Prioritized topics are topics that are small but important to delivery in a timely
+/// manner. Handling those topics in a separate threadpool prevents large updates of other
+/// topics slowing or blocking dissemination of updates to prioritized topics.
+///
 /// Topic entries usually have human-readable keys, and values which are some serialised
 /// representation of a data structure, e.g. a Thrift struct. The contents of a value's
 /// byte string is opaque to the statestore, which maintains no information about how to
 /// deserialise it. Subscribers must use convention to interpret each other's updates.
-//
+///
 /// A subscriber may have marked some updates that it made as 'transient', which implies
 /// that those entries should be deleted once the subscriber is no longer connected (this
 /// is judged by the statestore's failure-detector, which will mark a subscriber as failed
 /// when it has not responded to a number of successive heartbeat messages). Transience
 /// is tracked per-topic-per-subscriber, so two different subscribers may treat the same
 /// topic differently wrt to the transience of their updates.
-//
+///
 /// The statestore tracks the history of updates to each topic, with each topic update
 /// getting a sequentially increasing version number that is unique across the topic.
-//
+///
 /// Subscribers also track the max version of each topic which they have have successfully
 /// processed. The statestore can use this information to send a delta of updates to a
 /// subscriber, rather than all items in the topic.  For non-delta updates, the statestore
 /// will send an update that includes all values in the topic.
+///
+/// +================+
+/// | Implementation |
+/// +================+
+///
+/// Locking:
+/// --------
+/// The lock acquisition order is:
+/// 1. 'subscribers_lock_'
+/// 2. 'topics_map_lock_'
+/// 3. Subscriber::transient_entry_lock_
+/// 4. Topic::lock_ (terminal)
 class Statestore : public CacheLineAligned {
  public:
   /// A SubscriberId uniquely identifies a single subscriber, and is
@@ -125,10 +150,14 @@ class Statestore : public CacheLineAligned {
     return thrift_iface_;
   }
 
-  /// Tells the Statestore to shut down. Does not wait for the processing loop to exit
-  /// before returning.
-  void SetExitFlag();
-
+  /// Names of prioritized topics that are handled in a separate threadpool. The topic
+  /// names are hardcoded here for expediency. Ideally we would have a more generic
+  /// interface for specifying prioritized topics, but for now we only have a small
+  /// fixed set of topics.
+  /// Topic tracking the set of live Impala daemon instances.
+  static const std::string IMPALA_MEMBERSHIP_TOPIC;
+  /// Topic tracking the state of admission control on all coordinators.
+  static const std::string IMPALA_REQUEST_QUEUE_TOPIC;
  private:
   /// A TopicEntry is a single entry in a topic, and logically is a <string, byte string>
   /// pair.
@@ -140,7 +169,7 @@ class Statestore : public CacheLineAligned {
     /// A version is a monotonically increasing counter. Each update to a topic has its own
     /// unique version with the guarantee that sequentially later updates have larger
     /// version numbers.
-    typedef uint64_t Version;
+    typedef int64_t Version;
 
     /// The Version value used to initialize a new TopicEntry.
     static const Version TOPIC_ENTRY_INITIAL_VERSION = 1L;
@@ -199,14 +228,14 @@ class Statestore : public CacheLineAligned {
           total_value_size_bytes_(0L), key_size_metric_(key_size_metric),
           value_size_metric_(value_size_metric), topic_size_metric_(topic_size_metric) { }
 
-    /// Adds an entry with the given key and value (bytes). If is_deleted is
-    /// true the entry is considered deleted, and may be garbage collected in the future.
-    /// The entry is assigned a new version number by the Topic, and that version number
-    /// is returned.
-    //
-    /// Must be called holding the topic lock
-    TopicEntry::Version Put(const TopicEntryKey& key, const TopicEntry::Value& bytes,
-        bool is_deleted);
+    /// Add entries with the given keys and values. If is_deleted is true for an entry,
+    /// it is considered deleted, and may be garbage collected in the future. Each entry
+    /// is assigned a new version number by the Topic, and the version numbers are
+    /// returned.
+    ///
+    /// Safe to call concurrently from multiple threads (for different subscribers).
+    /// Acquires an exclusive write lock for the topic.
+    std::vector<TopicEntry::Version> Put(const std::vector<TTopicItem>& entries);
 
     /// Utility method to support removing transient entries. We track the version numbers
     /// of entries added by subscribers, and remove entries with the same version number
@@ -215,26 +244,37 @@ class Statestore : public CacheLineAligned {
     //
     /// Deletion means marking the entry as deleted and incrementing its version
     /// number.
-    //
-    /// Must be called holding the topic lock
+    ///
+    /// Safe to call concurrently from multiple threads (for different subscribers).
+    /// Acquires an exclusive write lock for the topic.
     void DeleteIfVersionsMatch(TopicEntry::Version version, const TopicEntryKey& key);
 
-    const TopicId& id() const { return topic_id_; }
-    const TopicEntryMap& entries() const { return entries_; }
-    TopicEntry::Version last_version() const { return last_version_; }
-    const TopicUpdateLog& topic_update_log() const { return topic_update_log_; }
-    int64_t total_key_size_bytes() const { return total_key_size_bytes_; }
-    int64_t total_value_size_bytes() const { return total_value_size_bytes_; }
+    /// Build a delta update to send to 'subscriber_id' including the deltas greater
+    /// than 'last_processed_version' (not inclusive).
+    ///
+    /// Safe to call concurrently from multiple threads (for different subscribers).
+    /// Acquires a shared read lock for the topic.
+    void BuildDelta(const SubscriberId& subscriber_id,
+        TopicEntry::Version last_processed_version, TTopicDelta* delta);
 
+    /// Adds entries representing the current topic state to 'topic_json'.
+    void ToJson(rapidjson::Document* document, rapidjson::Value* topic_json);
    private:
-    /// Map from topic entry key to topic entry.
-    TopicEntryMap entries_;
-
     /// Unique identifier for this topic. Should be human-readable.
     const TopicId topic_id_;
 
-    /// Tracks the last version that was assigned to an entry in this Topic. Incremented on
-    /// every Put() so each TopicEntry is tagged with a unique version value.
+    /// Reader-writer lock to protect state below. This is a terminal lock - no
+    /// other locks should be acquired while holding this one. boost::shared_mutex
+    /// gives writers priority over readers in acquiring the lock, which prevents
+    /// starvation.
+    boost::shared_mutex lock_;
+
+    /// Map from topic entry key to topic entry.
+    TopicEntryMap entries_;
+
+    /// Tracks the last version that was assigned to an entry in this Topic. Incremented
+    /// every time an entry is added in Put() so each TopicEntry is tagged with a unique
+    /// version value.
     TopicEntry::Version last_version_;
 
     /// Contains a history of updates to this Topic, with each key being a Version and the
@@ -259,18 +299,12 @@ class Statestore : public CacheLineAligned {
     IntGauge* topic_size_metric_;
   };
 
-  /// Note on locking: Subscribers and Topics should be accessed under their own coarse
-  /// locks, and worker threads will use worker_lock_ to ensure safe access to the
-  /// subscriber work queue.
-
-  /// Protects access to exit_flag_, but is used mostly to ensure visibility of updates
-  /// between threads..
-  boost::mutex exit_flag_lock_;
-
-  bool exit_flag_;
-
-  /// Controls access to topics_. Cannot take subscribers_lock_ after acquiring this lock.
-  boost::mutex topic_lock_;
+  /// Protects the 'topics_' map. Should be held shared when reading or holding a
+  /// reference to entries in the map and exclusively when modifying the map.
+  /// See the class comment for the lock acquisition order. boost::shared_mutex
+  /// gives writers priority over readers in acquiring the lock, which prevents
+  /// starvation.
+  boost::shared_mutex topics_map_lock_;
 
   /// The entire set of topics tracked by the statestore
   typedef boost::unordered_map<TopicId, Topic> TopicMap;
@@ -287,40 +321,68 @@ class Statestore : public CacheLineAligned {
         const TNetworkAddress& network_address,
         const std::vector<TTopicRegistration>& subscribed_topics);
 
-    /// The TopicState contains information on whether entries written by this subscriber
-    /// should be considered transient, as well as the last topic entry version
-    /// successfully processed by this subscriber.
-    struct TopicState {
-      bool is_transient;
-      TopicEntry::Version last_version;
+    /// Information about a subscriber's subscription to a specific topic.
+    struct TopicSubscription {
+      TopicSubscription(bool is_transient) : is_transient(is_transient) {}
+
+      /// Whether entries written by this subscriber should be considered transient.
+      const bool is_transient;
+
+      /// The last topic entry version successfully processed by this subscriber. Only
+      /// written by a single thread at a time but can be read concurrently.
+      AtomicInt64 last_version{TOPIC_INITIAL_VERSION};
+
+      /// Map from the key to the version of a transient update made by this subscriber.
+      /// protected by Subscriber:: 'transient_entries_lock_'.
+      boost::unordered_map<TopicEntryKey, TopicEntry::Version> transient_entries_;
     };
 
     /// The set of topics subscribed to, and current state (as seen by this subscriber) of
     /// the topic.
-    typedef boost::unordered_map<TopicId, TopicState> Topics;
+    typedef boost::unordered_map<TopicId, TopicSubscription> Topics;
 
     /// The Version value used to initialize new Topic subscriptions for this Subscriber.
     static const TopicEntry::Version TOPIC_INITIAL_VERSION;
 
-    const Topics& subscribed_topics() const { return subscribed_topics_; }
+    const Topics& non_priority_subscribed_topics() const {
+      return non_priority_subscribed_topics_;
+    }
+    const Topics& priority_subscribed_topics() const { return priority_subscribed_topics_; }
     const TNetworkAddress& network_address() const { return network_address_; }
     const SubscriberId& id() const { return subscriber_id_; }
     const RegistrationId& registration_id() const { return registration_id_; }
 
-    /// Records the fact that an update to this topic is owned by this subscriber.  The
-    /// version number of the update is saved so that only those updates which are made
-    /// most recently by this subscriber - and not overwritten by another subscriber - are
-    /// deleted on failure. If the topic the entry belongs to is not marked as transient,
-    /// no update will be recorded.
-    void AddTransientUpdate(const TopicId& topic_id, const TopicEntryKey& topic_key,
-        TopicEntry::Version version);
-
-    /// Map from the topic / key pair to the version of a transient update made by this
-    /// subscriber.
-    typedef boost::unordered_map<std::pair<TopicId, TopicEntryKey>, TopicEntry::Version>
-        TransientEntryMap;
-
-    const TransientEntryMap& transient_entries() const { return transient_entries_; }
+    /// Get the Topics map that would be used to store 'topic_id'.
+    const Topics& GetTopicsMapForId(const TopicId& topic_id) const {
+      return IsPrioritizedTopic(topic_id) ? priority_subscribed_topics_
+                                          : non_priority_subscribed_topics_;
+    }
+    Topics* GetTopicsMapForId(const TopicId& topic_id) {
+      return IsPrioritizedTopic(topic_id) ? &priority_subscribed_topics_
+                                          : &non_priority_subscribed_topics_;
+    }
+
+    /// Records the fact that updates to this topic are owned by this subscriber.  The
+    /// version number of each update (which must be at the corresponding index in
+    /// 'versions' is saved so that only those updates which are made most recently by
+    /// this subscriber - and not overwritten by another subscriber - are deleted on
+    /// failure. If the topic each entry belongs to is not marked as transient, no update
+    /// will be recorded. Should not be called concurrently from multiple threads for a
+    /// given 'topic_id'.
+    ///
+    /// Returns false if DeleteAllTransientEntries() was called and 'topic_id' entries
+    /// are transient, in which case the caller should delete the entries themselves.
+    bool AddTransientEntries(const TopicId& topic_id,
+        const std::vector<TTopicItem>& entries,
+        const std::vector<TopicEntry::Version>& entry_versions) WARN_UNUSED_RESULT;
+
+    /// Delete all transient topic entries for this subscriber from 'global_topics'.
+    ///
+    /// Statestore::topics_map_lock_ (in shared mode) must be held by the caller.
+    void DeleteAllTransientEntries(TopicMap* global_topics);
+
+    /// Returns the number of transient entries.
+    int64_t NumTransientEntries();
 
     /// Returns the last version of the topic which this subscriber has successfully
     /// processed. Will never decrease.
@@ -328,7 +390,8 @@ class Statestore : public CacheLineAligned {
 
     /// Sets the subscriber's last processed version of the topic to the given value.  This
     /// should only be set when once a subscriber has succesfully processed the given
-    /// update corresponding to this version.
+    /// update corresponding to this version. Should not be called concurrently from
+    /// multiple threads for a given 'topic_id'.
     void SetLastTopicVersionProcessed(const TopicId& topic_id,
         TopicEntry::Version version);
 
@@ -346,19 +409,25 @@ class Statestore : public CacheLineAligned {
     /// The location of the subscriber service that this subscriber runs.
     const TNetworkAddress network_address_;
 
-    /// Map of topic subscriptions to current TopicState. The the state describes whether
-    /// updates on the topic are 'transient' (i.e., to be deleted upon subscriber failure)
-    /// or not and contains the version number of the last update processed by this
-    /// Subscriber on the topic.
-    Topics subscribed_topics_;
-
-    /// List of updates made by this subscriber so that transient entries may be deleted on
-    /// failure.
-    TransientEntryMap transient_entries_;
+    /// Maps of topic subscriptions to current TopicSubscription, with separate maps for
+    /// priority and non-priority topics. The state describes whether updates on the
+    /// topic are 'transient' (i.e., to be deleted upon subscriber failure) or not
+    /// and contains the version number of the last update processed by this Subscriber
+    /// on the topic. The set of keys is not modified after construction.
+    Topics priority_subscribed_topics_;
+    Topics non_priority_subscribed_topics_;
+
+    /// Lock held when adding or deleting transient entries. See class comment for lock
+    /// acquisition order.
+    boost::mutex transient_entry_lock_;
+
+    /// True once DeleteAllTransientEntries() has been called during subscriber
+    /// unregisteration. Protected by 'transient_entry_lock_'
+    bool unregistered_ = false;
   };
 
-  /// Protects access to subscribers_ and subscriber_uuid_generator_. Must be taken before
-  /// topic_lock_.
+  /// Protects access to subscribers_ and subscriber_uuid_generator_. See the class
+  /// comment for the lock acquisition order.
   boost::mutex subscribers_lock_;
 
   /// Map of subscribers currently connected; upon failure their entry is removed from this
@@ -393,11 +462,12 @@ class Statestore : public CacheLineAligned {
         registration_id(r_id) {}
   };
 
-  /// The statestore has two pools of threads that send messages to subscribers
+  /// The statestore has three pools of threads that send messages to subscribers
   /// one-by-one. One pool deals with 'heartbeat' messages that update failure detection
-  /// state, and the other pool sends 'topic update' messages which contain the
-  /// actual topic data that a subscriber does not yet have.
-  //
+  /// state, and the remaining pools send 'topic update' messages that contain the
+  /// actual topic data that a subscriber does not yet have, with one pool dedicated to
+  /// a set of special "prioritized" topics.
+  ///
   /// Each message is scheduled for some time in the future and each worker thread
   /// will sleep until that time has passed to rate-limit messages. Subscribers are
   /// placed back into the queue once they have been processed. A subscriber may have many
@@ -405,24 +475,31 @@ class Statestore : public CacheLineAligned {
   /// subscriber. Since at most one registration is considered 'live' per subscriber, this
   /// guarantees that subscribers_.size() - 1 'live' subscribers ahead of any subscriber in
   /// the queue.
-  //
+  ///
   /// Messages may be delayed for any number of reasons, including scheduler
   /// interference, lock unfairness when submitting to the thread pool and head-of-line
   /// blocking when threads are occupied sending messages to slow subscribers
   /// (subscribers are not guaranteed to be in the queue in next-update order).
-  //
+  ///
   /// Delays for heartbeat messages can result in the subscriber that is kept waiting
   /// assuming that the statestore has failed. Correct configuration of heartbeat message
   /// frequency and subscriber timeout is therefore very important, and depends upon the
   /// cluster size. See --statestore_heartbeat_frequency_ms and
   /// --statestore_subscriber_timeout_seconds. We expect that the provided defaults will
   /// work up to clusters of several hundred nodes.
-  //
+  ///
   /// Subscribers are therefore not processed in lock-step, and one subscriber may have
   /// seen many more messages than another during the same interval (if the second
   /// subscriber runs slow for any reason).
+  enum class UpdateKind {
+    TOPIC_UPDATE,
+    PRIORITY_TOPIC_UPDATE,
+    HEARTBEAT
+  };
   ThreadPool<ScheduledSubscriberUpdate> subscriber_topic_update_threadpool_;
 
+  ThreadPool<ScheduledSubscriberUpdate> subscriber_priority_topic_update_threadpool_;
+
   ThreadPool<ScheduledSubscriberUpdate> subscriber_heartbeat_threadpool_;
 
   /// Cache of subscriber clients used for UpdateState() RPCs. Only one client per
@@ -452,10 +529,11 @@ class Statestore : public CacheLineAligned {
   IntGauge* value_size_metric_;
   IntGauge* topic_size_metric_;
 
-  /// Tracks the distribution of topic-update durations - precisely the time spent in
-  /// calling the UpdateState() RPC which allows us to measure the network transmission
-  /// cost as well as the subscriber-side processing time.
+  /// Tracks the distribution of topic-update durations for regular and prioritized topic
+  /// updates. This measures the time spent in calling the UpdateState() RPC which
+  /// includes network transmission cost and subscriber-side processing time.
   StatsMetric<double>* topic_update_duration_metric_;
+  StatsMetric<double>* priority_topic_update_duration_metric_;
 
   /// Same as above, but for SendHeartbeat() RPCs.
   StatsMetric<double>* heartbeat_duration_metric_;
@@ -466,23 +544,25 @@ class Statestore : public CacheLineAligned {
       ThreadPool<ScheduledSubscriberUpdate>* thread_pool) WARN_UNUSED_RESULT;
 
   /// Sends either a heartbeat or topic update message to the subscriber in 'update' at
-  /// the closest possible time to the first member of 'update'. If is_heartbeat is true,
-  /// sends a heartbeat update, otherwise the set of pending topic updates is sent. Once
-  /// complete, the next update is scheduled and added to the appropriate queue.
-  void DoSubscriberUpdate(bool is_heartbeat, int thread_id,
+  /// the closest possible time to the first member of 'update'. If 'update_kind' is
+  /// HEARTBEAT, sends a heartbeat update, otherwise the set of priority/non-priority
+  /// pending topic updates is sent. Once complete, the next update is scheduled and
+  /// added to the appropriate queue.
+  void DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
       const ScheduledSubscriberUpdate& update);
 
   /// Does the work of updating a single subscriber, by calling UpdateState() on the client
   /// to send a list of topic deltas to the subscriber. If that call fails (either because
   /// the RPC could not be completed, or the subscriber indicated an error), this method
   /// returns a non-OK status immediately without further processing.
-  //
+  ///
   /// The subscriber may indicated that it skipped processing the message, either because
   /// it was not ready to do so or because it was busy. In that case, the UpdateState() RPC
   /// will return OK (since there was no error) and the output parameter update_skipped is
   /// set to true. Otherwise, any updates returned by the subscriber are applied to their
   /// target topics.
-  Status SendTopicUpdate(Subscriber* subscriber, bool* update_skipped) WARN_UNUSED_RESULT;
+  Status SendTopicUpdate(Subscriber* subscriber, UpdateKind update_kind,
+      bool* update_skipped) WARN_UNUSED_RESULT;
 
   /// Sends a heartbeat message to subscriber. Returns false if there was some error
   /// performing the RPC.
@@ -501,9 +581,10 @@ class Statestore : public CacheLineAligned {
   void UnregisterSubscriber(Subscriber* subscriber);
 
   /// Populates a TUpdateStateRequest with the update state for this subscriber. Iterates
-  /// over all updates in all subscribed topics, populating the given TUpdateStateRequest
-  /// object. Takes the topic_lock_ and subscribers_lock_.
-  void GatherTopicUpdates(const Subscriber& subscriber,
+  /// over all updates in all priority or non-priority subscribed topics, based on
+  /// 'update_kind'. The given TUpdateStateRequest object is populated with the
+  /// changes to the subscribed topics. Takes the topics_map_lock_ and subscribers_lock_.
+  void GatherTopicUpdates(const Subscriber& subscriber, UpdateKind update_kind,
       TUpdateStateRequest* update_state_request);
 
   /// Returns the minimum last processed topic version across all subscribers for the given
@@ -523,8 +604,14 @@ class Statestore : public CacheLineAligned {
   TopicEntry::Version GetMinSubscriberTopicVersion(
       const TopicId& topic_id, SubscriberId* subscriber_id = NULL);
 
-  /// True if the shutdown flag has been set true, false otherwise.
-  bool ShouldExit();
+  /// Returns true if this topic should be handled by the priority pool.
+  static bool IsPrioritizedTopic(const std::string& topic);
+
+  /// Return human-readable name for 'kind'.
+  static const char* GetUpdateKindName(UpdateKind kind);
+
+  /// Return the thread pool to process updates of 'kind'.
+  ThreadPool<ScheduledSubscriberUpdate>* GetThreadPool(UpdateKind kind);
 
   /// Webpage handler: upon return, 'document' will contain a list of topics as follows:
   /// "topics": [

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index df78a0b..b457741 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -972,7 +972,18 @@
     "key": "statestore-subscriber.topic-$0.processing-time-s"
   },
   {
-    "description": "The time (sec) taken to process Statestore subcriber topic updates.",
+    "description": "Interval between topic updates for Topic $0",
+    "contexts": [
+      "CATALOGSERVER",
+      "IMPALAD"
+    ],
+    "label": "Statestore Subscriber Topic $0 Update Interval",
+    "units": "TIME_S",
+    "kind": "STATS",
+    "key": "statestore-subscriber.topic-$0.update-interval"
+  },
+  {
+    "description": "The time (sec) taken to process Statestore subscriber topic updates.",
     "contexts": [
       "CATALOGSERVER",
       "IMPALAD"
@@ -1024,7 +1035,7 @@
     "key": "statestore.live-backends.list"
   },
   {
-    "description": "The time (sec) spent sending topic update RPCs. Includes subscriber-side processing time and network transmission time.",
+    "description": "The time (sec) spent sending non-priority topic update RPCs. Includes subscriber-side processing time and network transmission time.",
     "contexts": [
       "STATESTORE"
     ],
@@ -1034,6 +1045,16 @@
     "key": "statestore.topic-update-durations"
   },
   {
+    "description": "The time (sec) spent sending priority topic update RPCs. Includes subscriber-side processing time and network transmission time.",
+    "contexts": [
+      "STATESTORE"
+    ],
+    "label": "Statestore Priority Topic Update Durations",
+    "units": "TIME_S",
+    "kind": "STATS",
+    "key": "statestore.priority-topic-update-durations"
+  },
+  {
     "description": "The sum of the size of all keys for all topics tracked by the StateStore.",
     "contexts": [
       "STATESTORE"

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/tests/custom_cluster/test_admission_controller.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_admission_controller.py b/tests/custom_cluster/test_admission_controller.py
index 69cabd8..bed6994 100644
--- a/tests/custom_cluster/test_admission_controller.py
+++ b/tests/custom_cluster/test_admission_controller.py
@@ -120,9 +120,13 @@ MAX_NUM_QUEUED_QUERIES = 10
 # Mem limit (bytes) used in the mem limit test
 MEM_TEST_LIMIT = 12 * 1024 * 1024 * 1024
 
-_STATESTORED_ARGS = "-statestore_heartbeat_frequency_ms=%s "\
-                    "-statestore_update_frequency_ms=%s" %\
-                    (STATESTORE_RPC_FREQUENCY_MS, STATESTORE_RPC_FREQUENCY_MS)
+_STATESTORED_ARGS = ("-statestore_heartbeat_frequency_ms={freq_ms} "
+                     "-statestore_priority_update_frequency_ms={freq_ms}").format(
+                    freq_ms=STATESTORE_RPC_FREQUENCY_MS)
+
+# Name of the subscriber metric tracking the admission control update interval.
+REQUEST_QUEUE_UPDATE_INTERVAL =\
+    'statestore-subscriber.topic-impala-request-queue.update-interval'
 
 # Key in the query profile for the query options.
 PROFILE_QUERY_OPTIONS_KEY = "Query Options (set by configuration): "
@@ -551,14 +555,14 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
       sleep(1)
 
   def wait_for_statestore_updates(self, heartbeats):
-    """Waits for a number of statestore heartbeats from all impalads."""
+    """Waits for a number of admission control statestore updates from all impalads."""
     start_time = time()
     num_impalads = len(self.impalads)
     init = dict()
     curr = dict()
     for impalad in self.impalads:
-      init[impalad] = impalad.service.get_metric_value(\
-          'statestore-subscriber.topic-update-interval-time')['count']
+      init[impalad] = impalad.service.get_metric_value(
+          REQUEST_QUEUE_UPDATE_INTERVAL)['count']
       curr[impalad] = init[impalad]
 
     while True:
@@ -566,8 +570,8 @@ class TestAdmissionControllerStress(TestAdmissionControllerBase):
           init.values(), [curr[i] - init[i] for i in self.impalads])
       if all([curr[i] - init[i] >= heartbeats for i in self.impalads]): break
       for impalad in self.impalads:
-        curr[impalad] = impalad.service.get_metric_value(\
-            'statestore-subscriber.topic-update-interval-time')['count']
+        curr[impalad] = impalad.service.get_metric_value(
+            REQUEST_QUEUE_UPDATE_INTERVAL)['count']
       assert (time() - start_time < STRESS_TIMEOUT),\
           "Timed out waiting %s seconds for heartbeats" % (STRESS_TIMEOUT,)
       sleep(STATESTORE_RPC_FREQUENCY_MS / float(1000))

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/tests/statestore/test_statestore.py
----------------------------------------------------------------------
diff --git a/tests/statestore/test_statestore.py b/tests/statestore/test_statestore.py
index 1003dc7..ba51c8d 100644
--- a/tests/statestore/test_statestore.py
+++ b/tests/statestore/test_statestore.py
@@ -15,9 +15,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from collections import defaultdict
 import json
 import socket
 import threading
+import traceback
 import time
 import urllib2
 import uuid
@@ -167,7 +169,10 @@ class StatestoreSubscriber(object):
   more readable."""
   def __init__(self, heartbeat_cb=None, update_cb=None):
     self.heartbeat_event, self.heartbeat_count = threading.Condition(), 0
-    self.update_event, self.update_count = threading.Condition(), 0
+    # Track the number of updates received per topic.
+    self.update_counts = defaultdict(lambda : 0)
+    # Variables to notify for updates on each topic.
+    self.update_event = threading.Condition()
     self.heartbeat_cb, self.update_cb = heartbeat_cb, update_cb
     self.exception = None
 
@@ -191,12 +196,14 @@ class StatestoreSubscriber(object):
     """UpdateState RPC handler. Calls update callback if one exists."""
     self.update_event.acquire()
     try:
-      self.update_count += 1
+      for topic_name in args.topic_deltas: self.update_counts[topic_name] += 1
       response = DEFAULT_UPDATE_STATE_RESPONSE
       if self.update_cb is not None and self.exception is None:
         try:
           response = self.update_cb(self, args)
         except Exception, e:
+          # Print the original backtrace so it doesn't get lost.
+          traceback.print_exc()
           self.exception = e
       self.update_event.notify()
     finally:
@@ -278,21 +285,23 @@ class StatestoreSubscriber(object):
     finally:
       self.heartbeat_event.release()
 
-  def wait_for_update(self, count=None):
-    """Waits for some number of updates. If 'count' is provided, waits until the number
-    of updates seen by this subscriber exceeds count, otherwise waits for one further
-    update."""
+  def wait_for_update(self, topic_name, count=None):
+    """Waits for some number of updates of 'topic_name'. If 'count' is provided, waits
+    until the number updates seen by this subscriber exceeds count, otherwise waits
+    for one further update."""
     self.update_event.acquire()
+    start_time = time.time()
     try:
-      if count is not None and self.update_count >= count: return self
-      if count is None: count = self.update_count + 1
-      while count > self.update_count:
+      if count is not None and self.update_counts[topic_name] >= count: return self
+      if count is None: count = self.update_counts[topic_name] + 1
+      while count > self.update_counts[topic_name]:
         self.check_thread_exceptions()
-        last_count = self.update_count
+        last_count = self.update_counts[topic_name]
         self.update_event.wait(10)
-        if last_count == self.update_count:
-          raise Exception("Update not received within 10s (update count: %s)" %
-                          self.update_count)
+        if (time.time() > start_time + 10 and
+            last_count == self.update_counts[topic_name]):
+          raise Exception("Update not received for %s within 10s (update count: %s)" %
+                          (topic_name, last_count))
       self.check_thread_exceptions()
       return self
     finally:
@@ -340,14 +349,18 @@ class TestStatestore():
 
     def topic_update_correct(sub, args):
       delta = self.make_topic_update(topic_name)
-      if sub.update_count == 1:
+      update_count = sub.update_counts[topic_name]
+      if topic_name not in args.topic_deltas:
+        # The update doesn't contain our topic.
+        pass
+      elif update_count == 1:
         return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta],
                                     skipped=False)
-      elif sub.update_count == 2:
-        assert len(args.topic_deltas) == 1
+      elif update_count == 2:
+        assert len(args.topic_deltas) == 1, args.topic_deltas
         assert args.topic_deltas[topic_name].topic_entries == delta.topic_entries
         assert args.topic_deltas[topic_name].topic_name == delta.topic_name
-      elif sub.update_count == 3:
+      elif update_count == 3:
         # After the content-bearing update was processed, the next delta should be empty
         assert len(args.topic_deltas[topic_name].topic_entries) == 0
 
@@ -358,7 +371,7 @@ class TestStatestore():
     (
       sub.start()
          .register(topics=[reg])
-         .wait_for_update(3)
+         .wait_for_update(topic_name, 3)
     )
 
   def test_update_is_delta(self):
@@ -368,14 +381,18 @@ class TestStatestore():
     topic_name = "test_update_is_delta_%s" % uuid.uuid1()
 
     def check_delta(sub, args):
-      if sub.update_count == 1:
+      update_count = sub.update_counts[topic_name]
+      if topic_name not in args.topic_deltas:
+        # The update doesn't contain our topic.
+        pass
+      elif update_count == 1:
         assert args.topic_deltas[topic_name].is_delta == False
         delta = self.make_topic_update(topic_name)
         return TUpdateStateResponse(status=STATUS_OK, topic_updates=[delta],
                                     skipped=False)
-      elif sub.update_count == 2:
+      elif update_count == 2:
         assert args.topic_deltas[topic_name].is_delta == False
-      elif sub.update_count == 3:
+      elif update_count == 3:
         assert args.topic_deltas[topic_name].is_delta == True
         assert len(args.topic_deltas[topic_name].topic_entries) == 0
         assert args.topic_deltas[topic_name].to_version == 1
@@ -387,7 +404,7 @@ class TestStatestore():
     (
       sub.start()
          .register(topics=[reg])
-         .wait_for_update(3)
+         .wait_for_update(topic_name, 3)
     )
 
   def test_skipped(self):
@@ -395,7 +412,10 @@ class TestStatestore():
     topic_name = "test_skipped_%s" % uuid.uuid1()
 
     def check_skipped(sub, args):
-      if sub.update_count == 1:
+      # Ignore responses that don't contain our topic.
+      if topic_name not in args.topic_deltas: return DEFAULT_UPDATE_STATE_RESPONSE
+      update_count = sub.update_counts[topic_name]
+      if update_count == 1:
         update = self.make_topic_update(topic_name)
         return TUpdateStateResponse(status=STATUS_OK, topic_updates=[update],
                                     skipped=False)
@@ -410,15 +430,17 @@ class TestStatestore():
     (
       sub.start()
          .register(topics=[reg])
-         .wait_for_update(3)
+         .wait_for_update(topic_name, 3)
     )
 
   def test_failure_detected(self):
     sub = StatestoreSubscriber()
+    topic_name = "test_failure_detected"
+    reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
     (
       sub.start()
-         .register()
-         .wait_for_update(1)
+         .register(topics=[reg])
+         .wait_for_update(topic_name, 1)
          .kill()
          .wait_for_failure()
     )
@@ -428,10 +450,12 @@ class TestStatestore():
     minutes) the statestore should time them out every 3s and then eventually fail after
     40s (10 times (3 + 1), where the 1 is the inter-heartbeat delay)"""
     sub = StatestoreSubscriber(heartbeat_cb=lambda sub, args: time.sleep(300))
+    topic_name = "test_hung_heartbeat"
+    reg = TTopicRegistration(topic_name=topic_name, is_transient=True)
     (
       sub.start()
-         .register()
-         .wait_for_update(1)
+         .register(topics=[reg])
+         .wait_for_update(topic_name, 1)
          .wait_for_failure(timeout=60)
     )
 
@@ -443,21 +467,32 @@ class TestStatestore():
     transient_topic_name = "test_topic_persistence_transient_%s" % topic_id
 
     def add_entries(sub, args):
-      if sub.update_count == 1:
-        updates = [self.make_topic_update(persistent_topic_name),
-                   self.make_topic_update(transient_topic_name)]
+      # None of, one or both of the topics may be in the update.
+      updates = []
+      if (persistent_topic_name in args.topic_deltas and
+          sub.update_counts[persistent_topic_name] == 1):
+        updates.append(self.make_topic_update(persistent_topic_name))
+
+      if (transient_topic_name in args.topic_deltas and
+          sub.update_counts[transient_topic_name] == 1):
+        updates.append(self.make_topic_update(transient_topic_name))
+
+      if len(updates) > 0:
         return TUpdateStateResponse(status=STATUS_OK, topic_updates=updates,
                                     skipped=False)
-
       return DEFAULT_UPDATE_STATE_RESPONSE
 
     def check_entries(sub, args):
-      if sub.update_count == 1:
-        assert len(args.topic_deltas[transient_topic_name].topic_entries) == 0
+      # None of, one or both of the topics may be in the update.
+      if (persistent_topic_name in args.topic_deltas and
+          sub.update_counts[persistent_topic_name] == 1):
         assert len(args.topic_deltas[persistent_topic_name].topic_entries) == 1
         # Statestore should not send deletions when the update is not a delta, see
         # IMPALA-1891
         assert args.topic_deltas[persistent_topic_name].topic_entries[0].deleted == False
+      if (transient_topic_name in args.topic_deltas and
+          sub.update_counts[persistent_topic_name] == 1):
+        assert len(args.topic_deltas[transient_topic_name].topic_entries) == 0
       return DEFAULT_UPDATE_STATE_RESPONSE
 
     reg = [TTopicRegistration(topic_name=persistent_topic_name, is_transient=False),
@@ -467,7 +502,8 @@ class TestStatestore():
     (
       sub.start()
          .register(topics=reg)
-         .wait_for_update(2)
+         .wait_for_update(persistent_topic_name, 2)
+         .wait_for_update(transient_topic_name, 2)
          .kill()
          .wait_for_failure()
     )
@@ -476,5 +512,6 @@ class TestStatestore():
     (
       sub2.start()
           .register(topics=reg)
-          .wait_for_update(1)
+          .wait_for_update(persistent_topic_name, 1)
+          .wait_for_update(transient_topic_name, 1)
     )

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/www/statestore_subscribers.tmpl
----------------------------------------------------------------------
diff --git a/www/statestore_subscribers.tmpl b/www/statestore_subscribers.tmpl
index f117002..f57b4f6 100644
--- a/www/statestore_subscribers.tmpl
+++ b/www/statestore_subscribers.tmpl
@@ -25,6 +25,7 @@ under the License.
   <th>Id</th>
   <th>Address</th>
   <th>Subscribed topics</th>
+  <th>Subscribed priority topics</th>
   <th>Transient entries</th>
   <th>Registration ID</th>
 </tr>
@@ -34,6 +35,7 @@ under the License.
   <td>{{id}}</td>
   <td>{{address}}</td>
   <td>{{num_topics}}</td>
+  <td>{{num_priority_topics}}</td>
   <td>{{num_transient}}</td>
   <td>{{registration_id}}</td>
 </tr>

http://git-wip-us.apache.org/repos/asf/impala/blob/b0d3433e/www/statestore_topics.tmpl
----------------------------------------------------------------------
diff --git a/www/statestore_topics.tmpl b/www/statestore_topics.tmpl
index 99f0dfe..56568f7 100644
--- a/www/statestore_topics.tmpl
+++ b/www/statestore_topics.tmpl
@@ -25,6 +25,7 @@ under the License.
   <th>Topic Id</th>
   <th>Number of entries</th>
   <th>Version</th>
+  <th>Prioritized</th>
   <th>Oldest subscriber version</th>
   <th>Oldest subscriber ID</th>
   <th>Size (keys / values / total)</th>
@@ -35,6 +36,7 @@ under the License.
   <td>{{topic_id}}</td>
   <td>{{num_entries}}</td>
   <td>{{version}}</td>
+  <td>{{prioritized}}</td>
   <td>{{oldest_version}}</td>
   <td>{{oldest_id}}</td>
   <td>{{key_size}} / {{value_size}} / {{total_size}}</td>


[4/6] impala git commit: IMPALA-5269: Fix issue with final line of query followed by a comment

Posted by ta...@apache.org.
IMPALA-5269: Fix issue with final line of query followed by a comment

The patch is to remove any comments in a statement when checking if a
statement ends with a semicolon delimiter.

For example:

Before (semicolon delimiter is needed at the end):
select 1 + 1; -- comment\n;

After (semicolon delimiter is no longer needed):
select 1 + 1; -- comment

Testing:
- Ran end-to-end tests in shell

Change-Id: I54f9a8f65214023520eaa010fc462a663d02d258
Reviewed-on: http://gerrit.cloudera.org:8080/9191
Reviewed-by: Fredy Wijaya <fw...@cloudera.com>
Reviewed-by: Taras Bobrovytsky <tb...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/e1173653
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/e1173653
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/e1173653

Branch: refs/heads/master
Commit: e1173653b30ed3a0d8edfbed381446aa27c4fd09
Parents: f5986be
Author: Fredy Wijaya <fw...@cloudera.com>
Authored: Fri Feb 2 02:42:45 2018 -0600
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 14 07:38:28 2018 +0000

----------------------------------------------------------------------
 shell/impala_shell.py                 |  4 +++
 tests/shell/test_shell_interactive.py | 39 ++++++++++++++++++++++++++++++
 2 files changed, 43 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/e1173653/shell/impala_shell.py
----------------------------------------------------------------------
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 4a77d53..5a2ddc8 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -388,6 +388,10 @@ class ImpalaShell(object, cmd.Cmd):
     not considered terminated. If no open quotation is found, it's considered
     terminated.
     """
+    # Strip any comments to make a statement such as the following be considered as
+    # ending with a delimiter:
+    # select 1 + 1; -- this is a comment
+    line = sqlparse.format(line, strip_comments=True).rstrip()
     if line.endswith(ImpalaShell.CMD_DELIM):
       try:
         # Look for an open quotation in the entire command, and not just the

http://git-wip-us.apache.org/repos/asf/impala/blob/e1173653/tests/shell/test_shell_interactive.py
----------------------------------------------------------------------
diff --git a/tests/shell/test_shell_interactive.py b/tests/shell/test_shell_interactive.py
index 7f7f955..7815405 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -438,6 +438,45 @@ class TestImpalaShellInteractive(object):
     finally:
       os.chdir(cwd)
 
+  @pytest.mark.execute_serially
+  def test_line_ends_with_comment(self):
+    # IMPALA-5269: Test lines that end with a comment.
+    queries = ['select 1 + 1; --comment',
+               'select 1 + 1 --comment\n;']
+    for query in queries:
+      result = run_impala_shell_interactive(query)
+      assert '| 1 + 1 |' in result.stdout
+      assert '| 2     |' in result.stdout
+
+    queries = ['select \'some string\'; --comment',
+               'select \'some string\' --comment\n;']
+    for query in queries:
+      result = run_impala_shell_interactive(query)
+      assert '| \'some string\' |' in result.stdout
+      assert '| some string   |' in result.stdout
+
+    queries = ['select "--"; -- "--"',
+               'select \'--\'; -- "--"',
+               'select "--" -- "--"\n;',
+               'select \'--\' -- "--"\n;']
+    for query in queries:
+      result = run_impala_shell_interactive(query)
+      assert '| \'--\' |' in result.stdout
+      assert '| --   |' in result.stdout
+
+    query = ('select * from (\n' +
+             'select count(*) from functional.alltypes\n' +
+             ') v; -- Incomplete SQL statement in this line')
+    result = run_impala_shell_interactive(query)
+    assert '| count(*) |' in result.stdout
+
+    query = ('select id from functional.alltypes\n' +
+             'order by id; /*\n' +
+             '* Multi-line comment\n' +
+             '*/')
+    result = run_impala_shell_interactive(query)
+    assert '| id   |' in result.stdout
+
 def run_impala_shell_interactive(input_lines, shell_args=None):
   """Runs a command in the Impala shell interactively."""
   # if argument "input_lines" is a string, makes it into a list


[3/6] impala git commit: Use unqualified table refs in TPCH planner tests.

Posted by ta...@apache.org.
Use unqualified table refs in TPCH planner tests.

There were a few places where we accidentally used fully-qualified
table references. As a result, the testTpchViews() test did not
exactly cover what was intended.

Change-Id: I886c451ab61a1739af96eeb765821dfd8e951b07
Reviewed-on: http://gerrit.cloudera.org:8080/9270
Reviewed-by: Taras Bobrovytsky <tb...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/f5986beb
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/f5986beb
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/f5986beb

Branch: refs/heads/master
Commit: f5986bebcbe7f89b2435ca67d1a7fe43a363541f
Parents: d45de47
Author: Alex Behm <al...@cloudera.com>
Authored: Fri Feb 9 11:18:12 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 14 05:54:25 2018 +0000

----------------------------------------------------------------------
 .../queries/PlannerTest/tpch-all.test           | 12 ++---
 .../queries/PlannerTest/tpch-views.test         | 50 ++++++++++----------
 2 files changed, 31 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/f5986beb/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
index da2c745..d4a7d55 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
@@ -108,10 +108,10 @@ where
     select
       min(ps_supplycost)
     from
-      tpch.partsupp,
-      tpch.supplier,
-      tpch.nation,
-      tpch.region
+      partsupp,
+      supplier,
+      nation,
+      region
     where
       p_partkey = ps_partkey
       and s_suppkey = ps_suppkey
@@ -2460,7 +2460,7 @@ from (
     c_custkey,
     count(o_orderkey) as c_count
   from
-    customer left outer join tpch.orders on (
+    customer left outer join orders on (
       c_custkey = o_custkey
       and o_comment not like '%special%requests%'
     )
@@ -3749,7 +3749,7 @@ from
   supplier,
   lineitem l1,
   orders,
-  tpch.nation
+  nation
 where
   s_suppkey = l1.l_suppkey
   and o_orderkey = l1.l_orderkey

http://git-wip-us.apache.org/repos/asf/impala/blob/f5986beb/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test
index 5bb8828..52af979 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-views.test
@@ -64,10 +64,10 @@ where
     select
       min(ps_supplycost)
     from
-      tpch.partsupp,
-      tpch.supplier,
-      tpch.nation,
-      tpch.region
+      partsupp,
+      supplier,
+      nation,
+      region
     where
       p_partkey = ps_partkey
       and s_suppkey = ps_suppkey
@@ -128,36 +128,36 @@ PLAN-ROOT SINK
 |     runtime filters: RF012 -> tpch.supplier.s_nationkey, RF014 -> tpch.supplier.s_suppkey
 |
 12:AGGREGATE [FINALIZE]
-|  output: min(ps_supplycost)
-|  group by: ps_partkey
+|  output: min(tpch.partsupp.ps_supplycost)
+|  group by: tpch.partsupp.ps_partkey
 |
 11:HASH JOIN [INNER JOIN]
-|  hash predicates: n_regionkey = r_regionkey
-|  runtime filters: RF004 <- r_regionkey
+|  hash predicates: tpch.nation.n_regionkey = tpch.region.r_regionkey
+|  runtime filters: RF004 <- tpch.region.r_regionkey
 |
 |--08:SCAN HDFS [tpch.region]
 |     partitions=1/1 files=1 size=384B
-|     predicates: r_name = 'EUROPE'
+|     predicates: tpch.region.r_name = 'EUROPE'
 |
 10:HASH JOIN [INNER JOIN]
-|  hash predicates: s_nationkey = n_nationkey
-|  runtime filters: RF006 <- n_nationkey
+|  hash predicates: tpch.supplier.s_nationkey = tpch.nation.n_nationkey
+|  runtime filters: RF006 <- tpch.nation.n_nationkey
 |
 |--07:SCAN HDFS [tpch.nation]
 |     partitions=1/1 files=1 size=2.15KB
-|     runtime filters: RF004 -> n_regionkey
+|     runtime filters: RF004 -> tpch.nation.n_regionkey
 |
 09:HASH JOIN [INNER JOIN]
-|  hash predicates: ps_suppkey = s_suppkey
-|  runtime filters: RF008 <- s_suppkey
+|  hash predicates: tpch.partsupp.ps_suppkey = tpch.supplier.s_suppkey
+|  runtime filters: RF008 <- tpch.supplier.s_suppkey
 |
 |--06:SCAN HDFS [tpch.supplier]
 |     partitions=1/1 files=1 size=1.33MB
-|     runtime filters: RF006 -> s_nationkey
+|     runtime filters: RF006 -> tpch.supplier.s_nationkey
 |
 05:SCAN HDFS [tpch.partsupp]
    partitions=1/1 files=1 size=112.71MB
-   runtime filters: RF000 -> tpch.partsupp.ps_partkey, RF008 -> ps_suppkey
+   runtime filters: RF000 -> tpch.partsupp.ps_partkey, RF008 -> tpch.partsupp.ps_suppkey
 ====
 # TPCH-Q3
 # Q3 - Shipping Priority Query
@@ -878,7 +878,7 @@ from (
     c_custkey,
     count(o_orderkey) as c_count
   from
-    customer left outer join tpch.orders on (
+    customer left outer join orders on (
       c_custkey = o_custkey
       and o_comment not like '%special%requests%'
     )
@@ -901,11 +901,11 @@ PLAN-ROOT SINK
 |  group by: count(o_orderkey)
 |
 03:AGGREGATE [FINALIZE]
-|  output: count(o_orderkey)
+|  output: count(tpch.orders.o_orderkey)
 |  group by: tpch.customer.c_custkey
 |
 02:HASH JOIN [RIGHT OUTER JOIN]
-|  hash predicates: o_custkey = tpch.customer.c_custkey
+|  hash predicates: tpch.orders.o_custkey = tpch.customer.c_custkey
 |  runtime filters: RF000 <- tpch.customer.c_custkey
 |
 |--00:SCAN HDFS [tpch.customer]
@@ -913,8 +913,8 @@ PLAN-ROOT SINK
 |
 01:SCAN HDFS [tpch.orders]
    partitions=1/1 files=1 size=162.56MB
-   predicates: NOT o_comment LIKE '%special%requests%'
-   runtime filters: RF000 -> o_custkey
+   predicates: NOT tpch.orders.o_comment LIKE '%special%requests%'
+   runtime filters: RF000 -> tpch.orders.o_custkey
 ====
 # TPCH-Q14
 # Q14 - Promotion Effect
@@ -1361,7 +1361,7 @@ from
   supplier,
   lineitem l1,
   orders,
-  tpch.nation
+  nation
 where
   s_suppkey = l1.l_suppkey
   and o_orderkey = l1.l_orderkey
@@ -1414,12 +1414,12 @@ PLAN-ROOT SINK
 |  |  runtime filters: RF000 <- tpch.lineitem.l_orderkey
 |  |
 |  |--08:HASH JOIN [INNER JOIN]
-|  |  |  hash predicates: tpch.supplier.s_nationkey = n_nationkey
-|  |  |  runtime filters: RF002 <- n_nationkey
+|  |  |  hash predicates: tpch.supplier.s_nationkey = tpch.nation.n_nationkey
+|  |  |  runtime filters: RF002 <- tpch.nation.n_nationkey
 |  |  |
 |  |  |--03:SCAN HDFS [tpch.nation]
 |  |  |     partitions=1/1 files=1 size=2.15KB
-|  |  |     predicates: n_name = 'SAUDI ARABIA'
+|  |  |     predicates: tpch.nation.n_name = 'SAUDI ARABIA'
 |  |  |
 |  |  07:HASH JOIN [INNER JOIN]
 |  |  |  hash predicates: tpch.lineitem.l_suppkey = tpch.supplier.s_suppkey


[2/6] impala git commit: IMPALA-6516: Log catalog update only if the catalog version changes

Posted by ta...@apache.org.
IMPALA-6516: Log catalog update only if the catalog version changes

Impalad writes a line of log whenever a statestore catalog update comes
in. This patch removes the logging when the catalog version doesn't
change.

Change-Id: I04b8dd05c588d4cd91e9ca2251f8f66325bb45e2
Reviewed-on: http://gerrit.cloudera.org:8080/9311
Reviewed-by: Michael Ho <kw...@cloudera.com>
Reviewed-by: anujphadke <ap...@cloudera.com>
Reviewed-by: Alex Behm <al...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d45de474
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d45de474
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d45de474

Branch: refs/heads/master
Commit: d45de4747c57fdfb6b88f30a4b4b7685696277fc
Parents: d7f2ce1
Author: Tianyi Wang <tw...@cloudera.com>
Authored: Tue Feb 13 13:21:35 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Feb 14 05:26:48 2018 +0000

----------------------------------------------------------------------
 be/src/service/impala-server.cc | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d45de474/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 9898029..d5be4dc 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1347,13 +1347,15 @@ void ImpalaServer::CatalogUpdateCallback(
   } else {
     {
       unique_lock<mutex> unique_lock(catalog_version_lock_);
+      if (catalog_update_info_.catalog_version != resp.new_catalog_version) {
+        LOG(INFO) << "Catalog topic update applied with version: " <<
+            resp.new_catalog_version << " new min catalog object version: " <<
+            resp.min_catalog_object_version;
+      }
       catalog_update_info_.catalog_version = resp.new_catalog_version;
       catalog_update_info_.catalog_topic_version = delta.to_version;
       catalog_update_info_.catalog_service_id = resp.catalog_service_id;
       catalog_update_info_.min_catalog_object_version = resp.min_catalog_object_version;
-      LOG(INFO) << "Catalog topic update applied with version: " <<
-          resp.new_catalog_version << " new min catalog object version: " <<
-          resp.min_catalog_object_version;
       catalog_update_info_.UpdateCatalogVersionMetrics();
     }
     ImpaladMetrics::CATALOG_READY->SetValue(resp.new_catalog_version > 0);