You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ph...@apache.org on 2018/02/02 18:51:36 UTC
[12/19] impala git commit: IMPALA-2642: Fix a potential deadlock in
statestore
IMPALA-2642: Fix a potential deadlock in statestore
The statestored can deadlock if the number of subscribers has
reached STATESTORE_MAX_SUBSCRIBERS, because the DoSubscriberUpdate()
method calls OfferUpdate(), while holding subscribers_lock_, which
also tries to take the same lock in this situation.
Fix the issue by moving out the call to acquire subscribers_lock_ from
OfferUpdate(), and depend on the callers to take it. We also make
the maximum number of statestore subscribers a start-up time tuneable,
to allow us to test the limit more easily.
Testing: The problem is easily reproduced by lowering the value of
STATESTORE_MAX_SUBSCRIBERS to 3, and then launching a mini cluster
with 3 impalads. Without the fix, the statestored becomes completely
deadlocked.
A new EE test has been added to exercise this scenario. The test
verifies that statestored correctly rejects new subscription
requests when the limit it reached.
Change-Id: I5d49dede221ce1f50ec299643b5532c61f93f0c6
Reviewed-on: http://gerrit.cloudera.org:8080/9038
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/ca01c9b7
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/ca01c9b7
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/ca01c9b7
Branch: refs/heads/2.x
Commit: ca01c9b70f5d55b1f4c990c5f2897d110268bfff
Parents: 4e5f039
Author: Zoram Thanga <zo...@cloudera.com>
Authored: Tue Jan 16 12:01:09 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Feb 2 01:10:15 2018 +0000
----------------------------------------------------------------------
be/src/statestore/statestore.cc | 33 ++++----
be/src/statestore/statestore.h | 2 +-
tests/custom_cluster/test_custom_statestore.py | 88 +++++++++++++++++++++
3 files changed, 108 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/ca01c9b7/be/src/statestore/statestore.cc
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.cc b/be/src/statestore/statestore.cc
index 0f72e58..8f4ddbf 100644
--- a/be/src/statestore/statestore.cc
+++ b/be/src/statestore/statestore.cc
@@ -62,6 +62,9 @@ DEFINE_int32(statestore_heartbeat_tcp_timeout_seconds, 3, "(Advanced) The time a
"badly hung machines that are not able to respond to the heartbeat RPC in short "
"order");
+DEFINE_int32(statestore_max_subscribers, 10000, "Used to control the maximum size "
+ "of the pending topic-update queue. There is at most one entry per subscriber.");
+
// If this value is set too low, it's possible that UpdateState() might timeout during a
// working invocation, and only a restart of the statestore with a change in value would
// allow progress to be made. If set too high, a hung subscriber will waste an update
@@ -93,10 +96,6 @@ const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
// an item with the initial version.
const Statestore::TopicEntry::Version Statestore::Subscriber::TOPIC_INITIAL_VERSION = 0;
-// Used to control the maximum size of the pending topic-update queue, in which there is
-// at most one entry per subscriber.
-const int32_t STATESTORE_MAX_SUBSCRIBERS = 10000;
-
// Updates or heartbeats that miss their deadline by this much are logged.
const uint32_t DEADLINE_MISS_THRESHOLD_MS = 2000;
@@ -216,12 +215,12 @@ Statestore::Statestore(MetricGroup* metrics)
subscriber_topic_update_threadpool_("statestore-update",
"subscriber-update-worker",
FLAGS_statestore_num_update_threads,
- STATESTORE_MAX_SUBSCRIBERS,
+ FLAGS_statestore_max_subscribers,
bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, false, _1, _2)),
subscriber_heartbeat_threadpool_("statestore-heartbeat",
"subscriber-heartbeat-worker",
FLAGS_statestore_num_heartbeat_threads,
- STATESTORE_MAX_SUBSCRIBERS,
+ FLAGS_statestore_max_subscribers,
bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, true, _1, _2)),
update_state_client_cache_(new StatestoreSubscriberClientCache(1, 0,
FLAGS_statestore_update_tcp_timeout_seconds * 1000,
@@ -349,11 +348,17 @@ void Statestore::SubscribersHandler(const Webserver::ArgumentMap& args,
Status Statestore::OfferUpdate(const ScheduledSubscriberUpdate& update,
ThreadPool<ScheduledSubscriberUpdate>* threadpool) {
- if (threadpool->GetQueueSize() >= STATESTORE_MAX_SUBSCRIBERS
+ // Somewhat confusingly, we're checking the number of entries in a particular
+ // threadpool's work queue to decide whether or not we have too many
+ // subscribers. The number of subscribers registered can be actually more
+ // than statestore_max_subscribers. This is because RegisterSubscriber() adds
+ // the new subscriber to subscribers_ first before scheduling its updates.
+ // Should we be stricter in enforcing this limit on subscribers_.size() itself?
+ if (threadpool->GetQueueSize() >= FLAGS_statestore_max_subscribers
|| !threadpool->Offer(update)) {
stringstream ss;
- ss << "Maximum subscriber limit reached: " << STATESTORE_MAX_SUBSCRIBERS;
- lock_guard<mutex> l(subscribers_lock_);
+ ss << "Maximum subscriber limit reached: " << FLAGS_statestore_max_subscribers;
+ ss << ", subscribers_ size: " << subscribers_.size();
SubscriberMap::iterator subscriber_it = subscribers_.find(update.subscriber_id);
DCHECK(subscriber_it != subscribers_.end());
subscribers_.erase(subscriber_it);
@@ -400,12 +405,12 @@ Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
PrintId(current_registration->registration_id()), true);
num_subscribers_metric_->SetValue(subscribers_.size());
subscriber_set_metric_->Add(subscriber_id);
- }
- // Add the subscriber to the update queue, with an immediate schedule.
- ScheduledSubscriberUpdate update(0, subscriber_id, *registration_id);
- RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_));
- RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_));
+ // Add the subscriber to the update queue, with an immediate schedule.
+ ScheduledSubscriberUpdate update(0, subscriber_id, *registration_id);
+ RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_));
+ RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_));
+ }
LOG(INFO) << "Subscriber '" << subscriber_id << "' registered (registration id: "
<< PrintId(*registration_id) << ")";
http://git-wip-us.apache.org/repos/asf/impala/blob/ca01c9b7/be/src/statestore/statestore.h
----------------------------------------------------------------------
diff --git a/be/src/statestore/statestore.h b/be/src/statestore/statestore.h
index 38b8361..deeb5aa 100644
--- a/be/src/statestore/statestore.h
+++ b/be/src/statestore/statestore.h
@@ -461,7 +461,7 @@ class Statestore : public CacheLineAligned {
StatsMetric<double>* heartbeat_duration_metric_;
/// Utility method to add an update to the given thread pool, and to fail if the thread
- /// pool is already at capacity.
+ /// pool is already at capacity. Assumes that subscribers_lock_ is held by the caller.
Status OfferUpdate(const ScheduledSubscriberUpdate& update,
ThreadPool<ScheduledSubscriberUpdate>* thread_pool) WARN_UNUSED_RESULT;
http://git-wip-us.apache.org/repos/asf/impala/blob/ca01c9b7/tests/custom_cluster/test_custom_statestore.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_custom_statestore.py b/tests/custom_cluster/test_custom_statestore.py
new file mode 100644
index 0000000..ee810ed
--- /dev/null
+++ b/tests/custom_cluster/test_custom_statestore.py
@@ -0,0 +1,88 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Tests statestore with non-default startup options
+
+import logging
+import os
+import pytest
+import re
+import sys
+import uuid
+import socket
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.impala_test_suite import ImpalaTestSuite
+
+from Types.ttypes import TNetworkAddress
+from thrift.protocol import TBinaryProtocol
+from thrift.transport import TSocket, TTransport
+
+import StatestoreService.StatestoreSubscriber as Subscriber
+import StatestoreService.StatestoreService as Statestore
+from ErrorCodes.ttypes import TErrorCode
+
+LOG = logging.getLogger('custom_statestore_test')
+STATESTORE_SERVICE_PORT = 24000
+
+# A simple wrapper class to launch a cluster where we can tune various
+# startup parameters of the statestored to test correct boundary-value
+# behavior.
+class TestCustomStatestore(CustomClusterTestSuite):
+ # Grab a port the statestore subscribers will use to connect.
+ # Note that all subscribers we create below use this port to connect,
+ # with different subscriber IDs.
+ handle = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ handle.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ handle.bind(('localhost', 0))
+ _, port = handle.getsockname()
+
+ @classmethod
+ def get_workload(self):
+ return 'functional-query'
+
+ def __register_subscriber(self):
+ subscriber_id = "python-test-client-%s" % uuid.uuid4()
+ topics = []
+ request = Subscriber.TRegisterSubscriberRequest(topic_registrations=topics,
+ subscriber_location=TNetworkAddress("localhost", self.port),
+ subscriber_id=subscriber_id)
+ client_transport = \
+ TTransport.TBufferedTransport(TSocket.TSocket('localhost', STATESTORE_SERVICE_PORT))
+ protocol = TBinaryProtocol.TBinaryProtocol(client_transport)
+ client = Statestore.Client(protocol)
+ client_transport.open()
+ return client.RegisterSubscriber(request)
+
+ @CustomClusterTestSuite.with_args(statestored_args="-statestore_max_subscribers=3")
+ def test_statestore_max_subscribers(self):
+ """Test that the statestored correctly handles the condition where the number
+ of subscribers exceeds FLAGS_statestore_max_subscribers
+ (see be/src/statestore/statestore.cc). The expected behavior is for the
+ statestored to reject the subscription request once the threshold is
+ exceeded."""
+ # With a statestore_max_subscribers of 3, we should hit the registration error
+ # pretty quick.
+ for x in xrange(20):
+ response = self.__register_subscriber()
+ if response.status.status_code == TErrorCode.OK:
+ self.registration_id = response.registration_id
+ LOG.log(logging.INFO, "Registration id %s, x=%d" % (response.registration_id, x))
+ else:
+ assert 'Maximum subscriber limit reached:' in ''.join(response.status.error_msgs)
+ return