You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/11/16 15:39:47 UTC

[2/5] incubator-impala git commit: IMPALA-3613: Avoid topic updates to unregistered subscriber instances

IMPALA-3613: Avoid topic updates to unregistered subscriber instances

Bug:

Without this patch, when a subscriber repeatedly reconnects to the
statestore, the latter queues the initial heartbeat message and a
bunch of topic updates to every instance of the registered subscriber.
These queued updates are eventually picked up by the heartbeating/topic
update threads and the corresponding RPCs are made to the subscribers.
The subscriber then rejects these updates since they were meant for an
earlier registration. This is usually possible if the subscriber has
some network problems leading to failing RPCs.

Such a node is eventually marked by the statestore as bad, but depending
on the configurations set, the issue can snowball into DDOS kind of
attack when the entire thread pool of heartbeating/topic updates is
filled with instances from the problematic host. This can result in
the statestore missing timely heartbeats to other subscribers making
them reconnect. This worsens the situation and the resulting topic
updates for the reconnects will fully saturate the network on the
statestore host, until the statestore daemon is restarted.

Fix:

This patch maps topic updates/heartbeats to a specific subscriber
registered instance rather to a subscriber id (that stays same across
reconnects). That way, when we encounter a topic update that was meant to
a stale subscriber, we can simply reject it.

Testing:

Tested this locally by adding relevant logging. I made the subscribers
to reconnect aggressively(a) and delaying heartbeats from the statestore
side (b,c).

(a) --statestore_subscriber_timeout_seconds=1
(b) --statestore_max_missed_heartbeats=1000
(c) --statestore_heartbeat_frequency_ms=60000

Change-Id: I0329ae7d23dc6e9b04b7bc3ee8d89cbc73756f65
Reviewed-on: http://gerrit.cloudera.org:8080/8449
Reviewed-by: Bharath Vissapragada <bh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/46d1be4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/46d1be4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/46d1be4c

Branch: refs/heads/master
Commit: 46d1be4c3cfedfde79d7b1c177e0c52d54c98801
Parents: 6539e89
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Wed Nov 1 12:31:22 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Nov 15 23:53:36 2017 +0000

----------------------------------------------------------------------
 be/src/statestore/statestore-subscriber.cc | 12 ++--
 be/src/statestore/statestore-subscriber.h  |  8 +--
 be/src/statestore/statestore.cc            | 75 ++++++++++++++++---------
 be/src/statestore/statestore.h             | 36 +++++++++---
 4 files changed, 86 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46d1be4c/be/src/statestore/statestore-subscriber.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.cc b/be/src/statestore/statestore-subscriber.cc
index 4c5d480..678236e 100644
--- a/be/src/statestore/statestore-subscriber.cc
+++ b/be/src/statestore/statestore-subscriber.cc
@@ -282,13 +282,13 @@ void StatestoreSubscriber::RecoveryModeChecker() {
   }
 }
 
-Status StatestoreSubscriber::CheckRegistrationId(const TUniqueId& registration_id) {
+Status StatestoreSubscriber::CheckRegistrationId(const RegistrationId& registration_id) {
   {
     lock_guard<mutex> r(registration_id_lock_);
     // If this subscriber has just started, the registration_id_ may not have been set
-    // despite the statestore starting to send updates. The 'unset' TUniqueId is 0:0, so
-    // we can differentiate between a) an early message from an eager statestore, and b)
-    // a message that's targeted to a previous registration.
+    // despite the statestore starting to send updates. The 'unset' RegistrationId is 0:0,
+    // so we can differentiate between a) an early message from an eager statestore, and
+    // b) a message that's targeted to a previous registration.
     if (registration_id_ != TUniqueId() && registration_id != registration_id_) {
       return Status(Substitute("Unexpected registration ID: $0, was expecting $1",
           PrintId(registration_id), PrintId(registration_id_)));
@@ -298,7 +298,7 @@ Status StatestoreSubscriber::CheckRegistrationId(const TUniqueId& registration_i
   return Status::OK();
 }
 
-void StatestoreSubscriber::Heartbeat(const TUniqueId& registration_id) {
+void StatestoreSubscriber::Heartbeat(const RegistrationId& registration_id) {
   const Status& status = CheckRegistrationId(registration_id);
   if (status.ok()) {
     heartbeat_interval_metric_->Update(
@@ -310,7 +310,7 @@ void StatestoreSubscriber::Heartbeat(const TUniqueId& registration_id) {
 }
 
 Status StatestoreSubscriber::UpdateState(const TopicDeltaMap& incoming_topic_deltas,
-    const TUniqueId& registration_id, vector<TTopicDelta>* subscriber_topic_updates,
+    const RegistrationId& registration_id, vector<TTopicDelta>* subscriber_topic_updates,
     bool* skipped) {
   // We don't want to block here because this is an RPC, and delaying the return causes
   // the statestore to delay sending further messages. The only time that lock_ might be

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46d1be4c/be/src/statestore/statestore-subscriber.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore-subscriber.h b/be/src/statestore/statestore-subscriber.h
index 49db5d0..e8b2204 100644
--- a/be/src/statestore/statestore-subscriber.h
+++ b/be/src/statestore/statestore-subscriber.h
@@ -162,7 +162,7 @@ class StatestoreSubscriber {
   /// registration_id_ will change after Register() is called again. This allows the
   /// subscriber to reject communication from the statestore that pertains to a previous
   /// registration.
-  TUniqueId registration_id_;
+  RegistrationId registration_id_;
 
   struct Callbacks {
     /// Owned by the MetricGroup instance. Tracks how long callbacks took to process this
@@ -250,11 +250,11 @@ class StatestoreSubscriber {
   /// update was being processed, or if the subscriber currently believes it is
   /// recovering). Doing so indicates that no topics were updated during this call.
   Status UpdateState(const TopicDeltaMap& incoming_topic_deltas,
-      const TUniqueId& registration_id,
+      const RegistrationId& registration_id,
       std::vector<TTopicDelta>* subscriber_topic_updates, bool* skipped);
 
   /// Called when the statestore sends a heartbeat message. Updates the failure detector.
-  void Heartbeat(const TUniqueId& registration_id);
+  void Heartbeat(const RegistrationId& registration_id);
 
   /// Run in a separate thread. In a loop, check failure_detector_ to see if the statestore
   /// is still sending heartbeat messages. If not, enter 'recovery mode' where a
@@ -274,7 +274,7 @@ class StatestoreSubscriber {
   /// Returns OK if registration_id == registration_id_, or if registration_id_ is not yet
   /// set, an error otherwise. Used to confirm that RPCs from the statestore are intended
   /// for the current registration epoch.
-  Status CheckRegistrationId(const TUniqueId& registration_id);
+  Status CheckRegistrationId(const RegistrationId& registration_id);
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46d1be4c/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 6d5880f..d0a4851 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -110,7 +110,7 @@ class StatestoreThriftIf : public StatestoreServiceIf {
 
   virtual void RegisterSubscriber(TRegisterSubscriberResponse& response,
       const TRegisterSubscriberRequest& params) {
-    TUniqueId registration_id;
+    RegistrationId registration_id;
     Status status = statestore_->RegisterSubscriber(params.subscriber_id,
         params.subscriber_location, params.topic_registrations, &registration_id);
     status.ToThrift(&response.status);
@@ -175,7 +175,7 @@ void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
 }
 
 Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
-    const TUniqueId& registration_id, const TNetworkAddress& network_address,
+    const RegistrationId& registration_id, const TNetworkAddress& network_address,
     const vector<TTopicRegistration>& subscribed_topics)
     : subscriber_id_(subscriber_id),
       registration_id_(registration_id),
@@ -288,7 +288,8 @@ void Statestore::TopicsHandler(const Webserver::ArgumentMap& args,
     topic_json.AddMember("num_entries",
         static_cast<uint64_t>(topic.second.entries().size()),
         document->GetAllocator());
-    topic_json.AddMember("version", topic.second.last_version(), document->GetAllocator());
+    topic_json.AddMember(
+        "version", topic.second.last_version(), document->GetAllocator());
 
     SubscriberId oldest_subscriber_id;
     TopicEntry::Version oldest_subscriber_version =
@@ -353,7 +354,7 @@ Status Statestore::OfferUpdate(const ScheduledSubscriberUpdate& update,
     stringstream ss;
     ss << "Maximum subscriber limit reached: " << STATESTORE_MAX_SUBSCRIBERS;
     lock_guard<mutex> l(subscribers_lock_);
-    SubscriberMap::iterator subscriber_it = subscribers_.find(update.second);
+    SubscriberMap::iterator subscriber_it = subscribers_.find(update.subscriber_id);
     DCHECK(subscriber_it != subscribers_.end());
     subscribers_.erase(subscriber_it);
     LOG(ERROR) << ss.str();
@@ -365,7 +366,8 @@ Status Statestore::OfferUpdate(const ScheduledSubscriberUpdate& update,
 
 Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
     const TNetworkAddress& location,
-    const vector<TTopicRegistration>& topic_registrations, TUniqueId* registration_id) {
+    const vector<TTopicRegistration>& topic_registrations,
+    RegistrationId* registration_id) {
   if (subscriber_id.empty()) return Status("Subscriber ID cannot be empty string");
 
   // Create any new topics first, so that when the subscriber is first sent a topic update
@@ -401,7 +403,7 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
   }
 
   // Add the subscriber to the update queue, with an immediate schedule.
-  ScheduledSubscriberUpdate update = make_pair(0, subscriber_id);
+  ScheduledSubscriberUpdate update(0, subscriber_id, *registration_id);
   RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_));
   RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_));
 
@@ -410,6 +412,17 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
   return Status::OK();
 }
 
+bool Statestore::FindSubscriber(const SubscriberId& subscriber_id,
+    const RegistrationId& registration_id, shared_ptr<Subscriber>* subscriber) {
+  DCHECK(subscriber != nullptr);
+  lock_guard<mutex> l(subscribers_lock_);
+  SubscriberMap::iterator it = subscribers_.find(subscriber_id);
+  if (it == subscribers_.end() ||
+      it->second->registration_id() != registration_id) return false;
+  *subscriber = it->second;
+  return true;
+}
+
 Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped) {
   // Time any successful RPCs (i.e. those for which UpdateState() completed, even though
   // it may have returned an error.)
@@ -614,7 +627,13 @@ Status Statestore::SendHeartbeat(Subscriber* subscriber) {
 
 void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
     const ScheduledSubscriberUpdate& update) {
-  int64_t update_deadline = update.first;
+  int64_t update_deadline = update.deadline;
+  shared_ptr<Subscriber> subscriber;
+  // Check if the subscriber has re-registered, in which case we can ignore
+  // this scheduled update.
+  if (!FindSubscriber(update.subscriber_id, update.registration_id, &subscriber)) {
+    return;
+  }
   const string hb_type = is_heartbeat ? "heartbeat" : "topic update";
   if (update_deadline != 0) {
     // Wait until deadline.
@@ -623,9 +642,15 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
       SleepForMs(diff_ms);
       diff_ms = update_deadline - UnixMillis();
     }
+    // The subscriber can potentially reconnect by the time this thread wakes
+    // up. In such case, we can ignore this update.
+    if (UNLIKELY(!FindSubscriber(
+        subscriber->id(), subscriber->registration_id(), &subscriber))) {
+      return;
+    }
     diff_ms = std::abs(diff_ms);
-    VLOG(3) << "Sending " << hb_type << " message to: " << update.second
-            << " (deadline accuracy: " << diff_ms << "ms)";
+    VLOG(3) << "Sending " << hb_type << " message to: " << update.subscriber_id
+        << " (deadline accuracy: " << diff_ms << "ms)";
 
     if (diff_ms > DEADLINE_MISS_THRESHOLD_MS && is_heartbeat) {
       const string& msg = Substitute(
@@ -633,7 +658,8 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
           "consider increasing --statestore_heartbeat_frequency_ms (currently $3) on "
           "this Statestore, and --statestore_subscriber_timeout_seconds "
           "on subscribers (Impala daemons and the Catalog Server)",
-          update.second, hb_type, diff_ms, FLAGS_statestore_heartbeat_frequency_ms);
+          update.subscriber_id, hb_type, diff_ms,
+          FLAGS_statestore_heartbeat_frequency_ms);
       LOG(WARNING) << msg;
     }
     // Don't warn for topic updates - they can be slow and still correct. Recommending an
@@ -643,15 +669,9 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
   } else {
     // The first update is scheduled immediately and has a deadline of 0. There's no need
     // to wait.
-    VLOG(3) << "Initial " << hb_type << " message for: " << update.second;
-  }
-  shared_ptr<Subscriber> subscriber;
-  {
-    lock_guard<mutex> l(subscribers_lock_);
-    SubscriberMap::iterator it = subscribers_.find(update.second);
-    if (it == subscribers_.end()) return;
-    subscriber = it->second;
+    VLOG(3) << "Initial " << hb_type << " message for: " << update.subscriber_id;
   }
+
   // Send the right message type, and compute the next deadline
   int64_t deadline_ms = 0;
   Status status;
@@ -686,31 +706,32 @@ void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
     lock_guard<mutex> l(subscribers_lock_);
     // Check again if this registration has been removed while we were processing the
     // message.
-    SubscriberMap::iterator it = subscribers_.find(update.second);
-    if (it == subscribers_.end()) return;
+    SubscriberMap::iterator it = subscribers_.find(update.subscriber_id);
+    if (it == subscribers_.end() ||
+        it->second->registration_id() != update.registration_id) return;
     if (!status.ok()) {
       LOG(INFO) << "Unable to send " << hb_type << " message to subscriber "
-                << update.second << ", received error: " << status.GetDetail();
+                << update.subscriber_id << ", received error: " << status.GetDetail();
     }
 
-    const string& registration_id = PrintId(subscriber->registration_id());
     FailureDetector::PeerState state = is_heartbeat ?
-        failure_detector_->UpdateHeartbeat(registration_id, status.ok()) :
-        failure_detector_->GetPeerState(registration_id);
+        failure_detector_->UpdateHeartbeat(update.subscriber_id, status.ok()) :
+        failure_detector_->GetPeerState(update.subscriber_id);
 
     if (state == FailureDetector::FAILED) {
       if (is_heartbeat) {
         // TODO: Consider if a metric to track the number of failures would be useful.
         LOG(INFO) << "Subscriber '" << subscriber->id() << "' has failed, disconnected "
-                  << "or re-registered (last known registration ID: " << update.second
-                  << ")";
+                  << "or re-registered (last known registration ID: "
+                  << update.registration_id << ")";
         UnregisterSubscriber(subscriber.get());
       }
     } else {
       // Schedule the next message.
       VLOG(3) << "Next " << (is_heartbeat ? "heartbeat" : "update") << " deadline for: "
               << subscriber->id() << " is in " << deadline_ms << "ms";
-      status = OfferUpdate(make_pair(deadline_ms, subscriber->id()), is_heartbeat ?
+      status = OfferUpdate(ScheduledSubscriberUpdate(deadline_ms, subscriber->id(),
+          subscriber->registration_id()), is_heartbeat ?
           &subscriber_heartbeat_threadpool_ : &subscriber_topic_update_threadpool_);
       if (!status.ok()) {
         LOG(INFO) << "Unable to send next " << (is_heartbeat ? "heartbeat" : "update")

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/46d1be4c/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 86a0d9e..38b8361 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -47,6 +47,7 @@ namespace impala {
 class Status;
 
 typedef ClientCache<StatestoreSubscriberClientWrapper> StatestoreSubscriberClientCache;
+typedef TUniqueId RegistrationId;
 
 /// The Statestore is a soft-state key-value store that maintains a set of Topics, which
 /// are maps from string keys to byte array values.
@@ -112,7 +113,7 @@ class Statestore : public CacheLineAligned {
   Status RegisterSubscriber(const SubscriberId& subscriber_id,
       const TNetworkAddress& location,
       const std::vector<TTopicRegistration>& topic_registrations,
-      TUniqueId* registration_id) WARN_UNUSED_RESULT;
+      RegistrationId* registration_id) WARN_UNUSED_RESULT;
 
   void RegisterWebpages(Webserver* webserver);
 
@@ -282,7 +283,7 @@ class Statestore : public CacheLineAligned {
   /// subscriber's ID and network location.
   class Subscriber {
    public:
-    Subscriber(const SubscriberId& subscriber_id, const TUniqueId& registration_id,
+    Subscriber(const SubscriberId& subscriber_id, const RegistrationId& registration_id,
         const TNetworkAddress& network_address,
         const std::vector<TTopicRegistration>& subscribed_topics);
 
@@ -304,7 +305,7 @@ class Statestore : public CacheLineAligned {
     const Topics& subscribed_topics() const { return subscribed_topics_; }
     const TNetworkAddress& network_address() const { return network_address_; }
     const SubscriberId& id() const { return subscriber_id_; }
-    const TUniqueId& registration_id() const { return registration_id_; }
+    const RegistrationId& registration_id() const { return registration_id_; }
 
     /// Records the fact that an update to this topic is owned by this subscriber.  The
     /// version number of the update is saved so that only those updates which are made
@@ -340,7 +341,7 @@ class Statestore : public CacheLineAligned {
     /// registration ID is handed out every time a subscriber successfully calls
     /// RegisterSubscriber() to distinguish between distinct connections from subscribers
     /// with the same subscriber_id_.
-    const TUniqueId registration_id_;
+    const RegistrationId registration_id_;
 
     /// The location of the subscriber service that this subscriber runs.
     const TNetworkAddress network_address_;
@@ -375,10 +376,22 @@ class Statestore : public CacheLineAligned {
   /// Used to generated unique IDs for each new registration.
   boost::uuids::random_generator subscriber_uuid_generator_;
 
-  /// Work item passed to both kinds of subscriber update threads. First entry is the
-  /// *earliest* time (in microseconds since epoch) that the next message should be sent,
-  /// the second entry is the subscriber to send it to.
-  typedef std::pair<int64_t, SubscriberId> ScheduledSubscriberUpdate;
+  /// Work item passed to both kinds of subscriber update threads.
+  struct ScheduledSubscriberUpdate {
+    /// *Earliest* time (in Unix time) that the next message should be sent.
+    int64_t deadline;
+
+    /// SubscriberId and RegistrationId of the registered subscriber instance this message
+    /// is intended for.
+    SubscriberId subscriber_id;
+    RegistrationId registration_id;
+
+    ScheduledSubscriberUpdate() {}
+
+    ScheduledSubscriberUpdate(int64_t next_update_time, SubscriberId s_id,
+        RegistrationId r_id): deadline(next_update_time), subscriber_id(s_id),
+        registration_id(r_id) {}
+  };
 
   /// The statestore has two pools of threads that send messages to subscribers
   /// one-by-one. One pool deals with 'heartbeat' messages that update failure detection
@@ -475,6 +488,13 @@ class Statestore : public CacheLineAligned {
   /// performing the RPC.
   Status SendHeartbeat(Subscriber* subscriber) WARN_UNUSED_RESULT;
 
+  /// Returns true (and sets subscriber to the corresponding Subscriber object) if a
+  /// registered subscriber exists in the subscribers_ map with the given subscriber_id
+  /// and registration_id. False otherwise.
+  bool FindSubscriber(const SubscriberId& subscriber_id,
+      const RegistrationId& registration_id, std::shared_ptr<Subscriber>* subscriber)
+      WARN_UNUSED_RESULT;
+
   /// Unregister a subscriber, removing all of its transient entries and evicting it from
   /// the subscriber map. Callers must hold subscribers_lock_ prior to calling this
   /// method.