You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/15 06:45:49 UTC

[2/2] impala git commit: IMPALA-4456: Address scalability issues of qs_map_lock_ and client_request_state_map_lock_

IMPALA-4456: Address scalability issues of qs_map_lock_ and client_request_state_map_lock_

The following 2 locks have shown to be frequent points of contention
on recent perf runs:

- qs_map_lock_
- client_request_state_map_lock_

Since these are process wide locks, any threads waiting on these locks
potentially slow down the runtime of a query.

I tried to address this previously by converting the client_request_state_map_lock_
to a reader-writer lock. This showed great perf improvements in the general
case, however, there were edge cases with big regressions as well.
In the general case, strict readers of the map got through so quickly
that we were able to see a reduction in the number of client connections
created, since this lock was contended for in the context of Thrift threads too.
The bad case is when writers were starved trying to register a new query
since there were so many readers. Changing the starve option resulted in
worse read performance all round.

Another approach which is relatively simpler is to shard the locks, which
proves to be very effective with no regressions. The maps and locks are
sharded to a default of 4 buckets initally.

Query IDs are created by using boost::uuids::random_generator. We use the
high bits of a query ID to assign queries to buckets. I verified that the
distribution of the high bits of a query ID are even across buckets on
my local machine:

For 10,000 queries sharded across 4 buckets, the distribution was:
bucket[0]: 2500
bucket[1]: 2489
bucket[2]: 2566
bucket[3]: 2445

A micro-benchmark is added to measure the improvement in performance. This
benchmark creates multiple threads each of which creates a QueryState and
accesses it multiple times. We can see improvements in the range 2x - 3.5x.

BEFORE:
------------------Benchmark 1: Create and access Query States.
Total Time (#Queries: 5 #Accesses: 100) : 1ms
Total Time (#Queries: 50 #Accesses: 100) : 8ms
Total Time (#Queries: 50 #Accesses: 1000) : 54ms
Total Time (#Queries: 500 #Accesses: 100) : 76ms
Total Time (#Queries: 500 #Accesses: 1000) : 543ms

AFTER:
------------------Benchmark 1: Create and access Query States.
Total Time (#Queries: 5 #Accesses: 100) : 2173.59K clock cycles
Total Time (#Queries: 50 #Accesses: 100) : 4ms
Total Time (#Queries: 50 #Accesses: 1000) : 15ms
Total Time (#Queries: 500 #Accesses: 100) : 46ms
Total Time (#Queries: 500 #Accesses: 1000) : 151ms

This change introduces a ShardedQueryMap, which is used to replace
the QueryExecMgr::qs_map_ and the ImpalaServer::client_request_state_map_,
and their corresponding locks, thereby abstracting away the access to the
maps locks.

For operations that need to happen on every entry in the ShardedQueryMap
maps, a new function ShardedQueryMap::DoFuncForAllEntries() is
introduced which takes a user supplied lambda and passes it every individual
map entry and executes it.

NOTE: This microbenchmark has shown that SpinLock has better performance
than boost::mutex for the qs_map_lock_'s, so that change has been made
too.

TODO: Add benchmark for client_request_state_map_lock_ too. The APIs
around that are more complicated, so this patch only includes
the benchmarking of qs_map_lock_.

TODO 2: Consider adopting the ShardedQueryMapTemplate for the SessionStateMap.

Change-Id: I61089090e1095da45a8a64ed3ccc78bd310807f1
Reviewed-on: http://gerrit.cloudera.org:8080/8363
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/19ab465b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/19ab465b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/19ab465b

Branch: refs/heads/master
Commit: 19ab465b391167e99658c9270a2a3d76b2b2652f
Parents: cce0b2d
Author: Sailesh Mukil <sa...@apache.org>
Authored: Fri Oct 6 16:27:47 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 15 05:04:50 2018 +0000

----------------------------------------------------------------------
 be/src/benchmarks/CMakeLists.txt                |   1 +
 .../benchmarks/process-wide-locks-benchmark.cc  | 174 +++++++++++++++++++
 be/src/runtime/query-exec-mgr.cc                |  32 ++--
 be/src/runtime/query-exec-mgr.h                 |  10 +-
 be/src/service/impala-http-handler.cc           |  23 ++-
 be/src/service/impala-server.cc                 |  34 ++--
 be/src/service/impala-server.h                  |  29 ++--
 be/src/util/sharded-query-map-util.h            | 135 ++++++++++++++
 8 files changed, 379 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/benchmarks/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/CMakeLists.txt b/be/src/benchmarks/CMakeLists.txt
index a569a66..02fbaad 100644
--- a/be/src/benchmarks/CMakeLists.txt
+++ b/be/src/benchmarks/CMakeLists.txt
@@ -48,6 +48,7 @@ ADD_BE_BENCHMARK(multiint-benchmark)
 ADD_BE_BENCHMARK(network-perf-benchmark)
 ADD_BE_BENCHMARK(overflow-benchmark)
 ADD_BE_BENCHMARK(parse-timestamp-benchmark)
+ADD_BE_BENCHMARK(process-wide-locks-benchmark)
 ADD_BE_BENCHMARK(row-batch-serialize-benchmark)
 ADD_BE_BENCHMARK(scheduler-benchmark)
 ADD_BE_BENCHMARK(status-benchmark)

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/benchmarks/process-wide-locks-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/process-wide-locks-benchmark.cc b/be/src/benchmarks/process-wide-locks-benchmark.cc
new file mode 100644
index 0000000..ffe4268
--- /dev/null
+++ b/be/src/benchmarks/process-wide-locks-benchmark.cc
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <iostream>
+
+#include <boost/bind.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/uuid/uuid.hpp>
+#include <boost/uuid/uuid_generators.hpp>
+#include <boost/uuid/uuid_io.hpp>
+
+#include "runtime/query-exec-mgr.h"
+#include "runtime/query-state.h"
+#include "runtime/test-env.h"
+#include "scheduling/request-pool-service.h"
+#include "service/fe-support.h"
+#include "util/benchmark.h"
+#include "util/cpu-info.h"
+#include "util/debug-util.h"
+#include "util/metrics.h"
+#include "util/pretty-printer.h"
+#include "util/stopwatch.h"
+#include "util/thread.h"
+#include "util/uid-util.h"
+
+#include "common/init.h"
+#include "common/names.h"
+
+/// This tests the performance of the following process wide locks:
+//
+/// 1. qs_map_lock_ (Sharded)
+/// TODO: client_request_state_map_lock_ (Sharded)
+//
+/// A reasonable amount of queries are created and accessed multiple times via the
+/// QueryExecMgr's APIs to benchmark the time taken to acquire the lock and retrieve
+/// the QueryState.
+//
+/// ------------------Benchmark 1: Create and access Query States.
+/// Total Time (#Queries: 5 #Accesses: 100) : 2202.44K clock cycles
+/// Total Time (#Queries: 50 #Accesses: 100) : 4ms
+/// Total Time (#Queries: 50 #Accesses: 1000) : 16ms
+/// Total Time (#Queries: 500 #Accesses: 100) : 46ms
+/// Total Time (#Queries: 500 #Accesses: 1000) : 129ms
+/// Total Time (#Queries: 500 #Accesses: 5000) : 518ms
+/// Total Time (#Queries: 1000 #Accesses: 1000) : 246ms
+/// Total Time (#Queries: 1000 #Accesses: 5000) : 1s018ms
+//
+/// This was created to test improvements for IMPALA-4456.
+
+using boost::uuids::random_generator;
+
+using namespace impala;
+
+boost::scoped_ptr<TestEnv> test_env_;
+vector<TUniqueId> query_ids;
+
+// This function creates a QueryState and accesses it 'num_accesses' times, via the
+// QueryExecMgr APIs.
+// TODO: Add a similar funciton for ClientRequestStates.
+void CreateAndAccessQueryStates(const TUniqueId& query_id, int num_accesses) {
+  TQueryCtx query_ctx;
+  query_ctx.query_id = query_id;
+
+  string resolved_pool;
+  Status s = ExecEnv::GetInstance()->request_pool_service()->ResolveRequestPool(
+      query_ctx, &resolved_pool);
+
+  query_ctx.__set_request_pool(resolved_pool);
+
+  QueryState *query_state;
+  query_state = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx);
+  DCHECK(query_state != nullptr);
+  query_state->AcquireExecResourceRefcount();
+
+  for (int i=0; i < num_accesses ; ++i) {
+    QueryState* qs;
+    qs = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id);
+    DCHECK(qs != nullptr);
+    ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(qs);
+  }
+
+  query_state->ReleaseExecResourceRefcount();
+  // This should drop the last reference count to the QueryState and destroy it.
+  ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state);
+  // Make sure that the query doesn't exist in the map any longer.
+  DCHECK(ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id) == nullptr);
+
+}
+
+// Runs 'num_threads' Impala Threads and have each of them execute func().
+void ImpalaThreadStarter(void (*func) (const TUniqueId&, int), int num_threads,
+    int func_arg) {
+  vector<unique_ptr<Thread>> threads;
+  threads.reserve(num_threads);
+
+  for (int i=0; i < num_threads; ++i) {
+    unique_ptr<Thread> thread;
+    function<void ()> f =
+        bind(func, query_ids[i], func_arg);
+    Status s =
+        Thread::Create("mythreadgroup", "thread", f, &thread);
+    DCHECK(s.ok());
+    threads.push_back(move(thread));
+  }
+  for (unique_ptr<Thread>& thread: threads) {
+    thread->Join();
+  }
+}
+
+void RunBenchmark(int num_queries, int num_accesses) {
+  StopWatch total_time;
+  total_time.Start();
+  ImpalaThreadStarter(CreateAndAccessQueryStates, num_queries, num_accesses);
+  total_time.Stop();
+
+  cout << "Total Time " << "(#Queries: " << num_queries << " #Accesses: "
+       << num_accesses << ") : "
+       << PrettyPrinter::Print(total_time.ElapsedTime(), TUnit::CPU_TICKS) << endl;
+}
+
+// Create and store 'num_queries' Query IDs into 'query_ids'.
+void CreateQueryIds(int num_queries) {
+  for (int i=0; i < num_queries; ++i) {
+    query_ids[i] = UuidToQueryId(random_generator()());
+  }
+}
+
+int main(int argc, char **argv) {
+  // Though we don't use the JVM or require FeSupport, the TestEnv class requires it,
+  // so we start them up.
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+
+  const int MAX_QUERIES = 1000;
+
+  query_ids.reserve(MAX_QUERIES);
+
+  test_env_.reset(new TestEnv());
+  ABORT_IF_ERROR(test_env_->Init());
+
+  CreateQueryIds(MAX_QUERIES);
+
+  cout << "------------------Benchmark 1: Create and access Query States." << endl;
+  RunBenchmark(5, 100);
+  RunBenchmark(50, 100);
+  RunBenchmark(50, 1000);
+  RunBenchmark(500, 100);
+  RunBenchmark(500, 1000);
+  RunBenchmark(500, 5000);
+  RunBenchmark(1000, 1000);
+  RunBenchmark(1000, 5000);
+
+  cout << endl;
+
+  // TODO: Benchmark lock of ClientRequestStates too.
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 316b712..967dc4b 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -82,9 +82,12 @@ QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
   QueryState* qs = nullptr;
   int refcnt;
   {
-    lock_guard<mutex> l(qs_map_lock_);
-    auto it = qs_map_.find(query_id);
-    if (it == qs_map_.end()) return nullptr;
+    ScopedShardedMapRef<QueryState*> map_ref(query_id,
+        &ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto it = map_ref->find(query_id);
+    if (it == map_ref->end()) return nullptr;
     qs = it->second;
     refcnt = qs->refcnt_.Add(1);
   }
@@ -98,12 +101,15 @@ QueryState* QueryExecMgr::GetOrCreateQueryState(
   QueryState* qs = nullptr;
   int refcnt;
   {
-    lock_guard<mutex> l(qs_map_lock_);
-    auto it = qs_map_.find(query_ctx.query_id);
-    if (it == qs_map_.end()) {
+    ScopedShardedMapRef<QueryState*> map_ref(query_ctx.query_id,
+        &ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto it = map_ref->find(query_ctx.query_id);
+    if (it == map_ref->end()) {
       // register new QueryState
       qs = new QueryState(query_ctx);
-      qs_map_.insert(make_pair(query_ctx.query_id, qs));
+      map_ref->insert(make_pair(query_ctx.query_id, qs));
       *created = true;
     } else {
       qs = it->second;
@@ -153,18 +159,20 @@ void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
 
   QueryState* qs_from_map = nullptr;
   {
-    // for now, gc right away
-    lock_guard<mutex> l(qs_map_lock_);
-    auto it = qs_map_.find(query_id);
+    ScopedShardedMapRef<QueryState*> map_ref(query_id,
+        &ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto it = map_ref->find(query_id);
     // someone else might have gc'd the entry
-    if (it == qs_map_.end()) return;
+    if (it == map_ref->end()) return;
     qs_from_map = it->second;
     DCHECK_EQ(qs_from_map->query_ctx().query_id, query_id);
     int32_t cnt = qs_from_map->refcnt_.Load();
     DCHECK_GE(cnt, 0);
     // someone else might have increased the refcnt in the meantime
     if (cnt > 0) return;
-    qs_map_.erase(it);
+    map_ref->erase(it);
   }
   // TODO: send final status report during gc, but do this from a different thread
   delete qs_from_map;

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/runtime/query-exec-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index 8a0c884..bddd731 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -23,8 +23,8 @@
 #include <unordered_map>
 
 #include "common/status.h"
-#include "util/uid-util.h"
 #include "gen-cpp/Types_types.h"
+#include "util/sharded-query-map-util.h"
 
 namespace impala {
 
@@ -39,7 +39,7 @@ class FragmentInstanceState;
 /// entry point for gaining refcounted access to a QueryState. It also initiates
 /// query execution.
 /// Thread-safe.
-class QueryExecMgr {
+class QueryExecMgr : public CacheLineAligned {
  public:
   /// Creates QueryState if it doesn't exist and initiates execution of all fragment
   /// instance for this query. All fragment instances hold a reference to their
@@ -64,11 +64,9 @@ class QueryExecMgr {
   void ReleaseQueryState(QueryState* qs);
 
  private:
-  /// protects qs_map_
-  boost::mutex qs_map_lock_;
 
-  /// map from query id to QueryState (owned by us)
-  std::unordered_map<TUniqueId, QueryState*> qs_map_;
+  typedef ShardedQueryMap<QueryState*> QueryStateMap;
+  QueryStateMap qs_map_;
 
   /// Gets the existing QueryState or creates a new one if not present.
   /// 'created' is set to true if it was created, false otherwise.

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index b633f2a..0156023 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -37,6 +37,7 @@
 #include "util/redactor.h"
 #include "util/summary-util.h"
 #include "util/time.h"
+#include "util/uid-util.h"
 #include "util/webserver.h"
 
 #include "common/names.h"
@@ -243,12 +244,11 @@ void ImpalaHttpHandler::QueryProfileEncodedHandler(const Webserver::ArgumentMap&
 
 void ImpalaHttpHandler::InflightQueryIdsHandler(const Webserver::ArgumentMap& args,
     Document* document) {
-  lock_guard<mutex> l(server_->client_request_state_map_lock_);
   stringstream ss;
-  for (const ImpalaServer::ClientRequestStateMap::value_type& request_state:
-       server_->client_request_state_map_) {
-    ss << request_state.second->query_id() << "\n";
-  }
+  server_->client_request_state_map_.DoFuncForAllEntries(
+      [&](const std::shared_ptr<ClientRequestState>& request_state) {
+          ss << request_state->query_id() << "\n";
+      });
   document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator());
   Value query_ids(ss.str().c_str(), document->GetAllocator());
   document->AddMember("contents", query_ids, document->GetAllocator());
@@ -366,14 +366,11 @@ void ImpalaHttpHandler::QueryStateHandler(const Webserver::ArgumentMap& args,
     Document* document) {
   set<ImpalaServer::QueryStateRecord, ImpalaServer::QueryStateRecordLessThan>
       sorted_query_records;
-  {
-    lock_guard<mutex> l(server_->client_request_state_map_lock_);
-    for (const ImpalaServer::ClientRequestStateMap::value_type& request_state:
-         server_->client_request_state_map_) {
-      // TODO: Do this in the browser so that sorts on other keys are possible.
-      sorted_query_records.insert(ImpalaServer::QueryStateRecord(*request_state.second));
-    }
-  }
+
+  server_->client_request_state_map_.DoFuncForAllEntries(
+      [&](const std::shared_ptr<ClientRequestState>& request_state) {
+          sorted_query_records.insert(ImpalaServer::QueryStateRecord(*request_state));
+      });
 
   Value in_flight_queries(kArrayType);
   int64_t num_waiting_queries = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 3866e40..c43c850 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -921,6 +921,7 @@ void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) {
   // single generator under a lock (since random_generator is not
   // thread-safe).
   query_ctx->query_id = UuidToQueryId(random_generator()());
+
 }
 
 Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
@@ -933,16 +934,19 @@ Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
   if (session_state->closed) return Status("Session has been closed, ignoring query.");
   const TUniqueId& query_id = request_state->query_id();
   {
-    lock_guard<mutex> l(client_request_state_map_lock_);
-    ClientRequestStateMap::iterator entry = client_request_state_map_.find(query_id);
-    if (entry != client_request_state_map_.end()) {
+    ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id,
+        &ExecEnv::GetInstance()->impala_server()->client_request_state_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto entry = map_ref->find(query_id);
+    if (entry != map_ref->end()) {
       // There shouldn't be an active query with that same id.
       // (query_id is globally unique)
       stringstream ss;
       ss << "query id " << PrintId(query_id) << " already exists";
       return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, ss.str()));
     }
-    client_request_state_map_.insert(make_pair(query_id, request_state));
+    map_ref->insert(make_pair(query_id, request_state));
   }
   // Metric is decremented in UnregisterQuery().
   ImpaladMetrics::NUM_QUERIES_REGISTERED->Increment(1L);
@@ -989,14 +993,17 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
 
   shared_ptr<ClientRequestState> request_state;
   {
-    lock_guard<mutex> l(client_request_state_map_lock_);
-    ClientRequestStateMap::iterator entry = client_request_state_map_.find(query_id);
-    if (entry == client_request_state_map_.end()) {
+    ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id,
+        &ExecEnv::GetInstance()->impala_server()->client_request_state_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto entry = map_ref->find(query_id);
+    if (entry == map_ref->end()) {
       return Status("Invalid or unknown query handle");
     } else {
       request_state = entry->second;
     }
-    client_request_state_map_.erase(entry);
+    map_ref->erase(entry);
   }
 
   request_state->Done();
@@ -2046,12 +2053,15 @@ void ImpalaServer::Join() {
 
 shared_ptr<ClientRequestState> ImpalaServer::GetClientRequestState(
     const TUniqueId& query_id) {
-  lock_guard<mutex> l(client_request_state_map_lock_);
-  ClientRequestStateMap::iterator i = client_request_state_map_.find(query_id);
-  if (i == client_request_state_map_.end()) {
+  ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id,
+      &ExecEnv::GetInstance()->impala_server()->client_request_state_map_);
+  DCHECK(map_ref.get() != nullptr);
+
+  auto entry = map_ref->find(query_id);
+  if (entry == map_ref->end()) {
     return shared_ptr<ClientRequestState>();
   } else {
-    return i->second;
+    return entry->second;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 237b0cb..abe8694 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -22,11 +22,11 @@
 #include <boost/thread/mutex.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/scoped_ptr.hpp>
-#include <boost/unordered_map.hpp>
 #include <boost/unordered_set.hpp>
 #include <boost/uuid/uuid.hpp>
 #include <boost/uuid/uuid_generators.hpp>
 #include <boost/uuid/uuid_io.hpp>
+#include <unordered_map>
 
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/ImpalaHiveServer2Service.h"
@@ -39,10 +39,10 @@
 #include "util/condition-variable.h"
 #include "util/metrics.h"
 #include "util/runtime-profile.h"
+#include "util/sharded-query-map-util.h"
 #include "util/simple-logger.h"
 #include "util/thread-pool.h"
 #include "util/time.h"
-#include "util/uid-util.h"
 #include "runtime/coordinator.h"
 #include "runtime/runtime-state.h"
 #include "runtime/timestamp-value.h"
@@ -126,7 +126,6 @@ class ClientRequestState;
 /// * uuid_lock_
 /// * catalog_version_lock_
 /// * connection_to_sessions_map_lock_
-/// * client_request_state_map_lock_
 ///
 /// TODO: The state of a running query is currently not cleaned up if the
 /// query doesn't experience any errors at runtime and close() doesn't get called.
@@ -138,8 +137,10 @@ class ClientRequestState;
 class ImpalaServer : public ImpalaServiceIf,
                      public ImpalaHiveServer2ServiceIf,
                      public ThriftServer::ConnectionHandlerIf,
-                     public boost::enable_shared_from_this<ImpalaServer> {
+                     public boost::enable_shared_from_this<ImpalaServer>,
+                     public CacheLineAligned {
  public:
+
   ImpalaServer(ExecEnv* exec_env);
   ~ImpalaServer();
 
@@ -502,8 +503,8 @@ class ImpalaServer : public ImpalaServiceIf,
       std::shared_ptr<SessionState> session_state, bool* registered_exec_state,
       std::shared_ptr<ClientRequestState>* exec_state) WARN_UNUSED_RESULT;
 
-  /// Registers the query exec state with client_request_state_map_ using the globally
-  /// unique query_id and add the query id to session state's open query list.
+  /// Registers the query exec state with client_request_state_map_ using the
+  /// globally unique query_id and add the query id to session state's open query list.
   /// The caller must have checked out the session state.
   Status RegisterQuery(std::shared_ptr<SessionState> session_state,
       const std::shared_ptr<ClientRequestState>& exec_state) WARN_UNUSED_RESULT;
@@ -521,9 +522,9 @@ class ImpalaServer : public ImpalaServiceIf,
       const std::shared_ptr<ClientRequestState>& exec_state) WARN_UNUSED_RESULT;
 
   /// Unregister the query by cancelling it, removing exec_state from
-  /// client_request_state_map_, and removing the query id from session state's in-flight
-  /// query list.  If check_inflight is true, then return an error if the query is not
-  /// yet in-flight.  Otherwise, proceed even if the query isn't yet in-flight (for
+  /// client_request_state_map_, and removing the query id from session state's
+  /// in-flight query list.  If check_inflight is true, then return an error if the query
+  /// is not yet in-flight.  Otherwise, proceed even if the query isn't yet in-flight (for
   /// cleaning up after an error on the query issuing path).
   Status UnregisterQuery(const TUniqueId& query_id, bool check_inflight,
       const Status* cause = NULL) WARN_UNUSED_RESULT;
@@ -623,7 +624,7 @@ class ImpalaServer : public ImpalaServiceIf,
 
   /// Copies a query's state into the query log. Called immediately prior to a
   /// ClientRequestState's deletion. Also writes the query profile to the profile log
-  /// on disk. Must be called with client_request_state_map_lock_ held
+  /// on disk.
   void ArchiveQuery(const ClientRequestState& query);
 
   /// Checks whether the given user is allowed to delegate as the specified do_as_user.
@@ -868,16 +869,12 @@ class ImpalaServer : public ImpalaServiceIf,
   /// when there are sessions that have a timeout.
   ConditionVariable session_timeout_cv_;
 
-  /// map from query id to exec state; ClientRequestState is owned by us and referenced
+  /// maps from query id to exec state; ClientRequestState is owned by us and referenced
   /// as a shared_ptr to allow asynchronous deletion
-  typedef boost::unordered_map<TUniqueId, std::shared_ptr<ClientRequestState>>
+  typedef class ShardedQueryMap<std::shared_ptr<ClientRequestState>>
       ClientRequestStateMap;
   ClientRequestStateMap client_request_state_map_;
 
-  /// Protects client_request_state_map_. See "Locking" in the class comment for lock
-  /// acquisition order.
-  boost::mutex client_request_state_map_lock_;
-
   /// Default query options in the form of TQueryOptions and beeswax::ConfigVariable
   TQueryOptions default_query_options_;
   std::vector<beeswax::ConfigVariable> default_configs_;

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/util/sharded-query-map-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/sharded-query-map-util.h b/be/src/util/sharded-query-map-util.h
new file mode 100644
index 0000000..59dc2d5
--- /dev/null
+++ b/be/src/util/sharded-query-map-util.h
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SHARDED_QUERY_MAP_UTIL_H
+#define SHARDED_QUERY_MAP_UTIL_H
+
+#include <boost/thread/lock_guard.hpp>
+#include <unordered_map>
+
+#include "gen-cpp/Types_types.h"
+#include "util/aligned-new.h"
+#include "util/spinlock.h"
+#include "util/uid-util.h"
+
+namespace impala {
+
+/// This is a template that can be used for any map that maps from a query ID (TUniqueId)
+/// to some object, and that needs to be sharded. It provides a SpinLock per shard to
+/// synchronize access to each shard of the map. The underlying shard is locked and
+/// accessed by instantiating a ScopedShardedMapRef.
+//
+/// Usage pattern:
+//
+///   typedef ShardedQueryMap<QueryState*> QueryStateMap;
+///   QueryStateMap qs_map_;
+//
+template<typename T>
+class ShardedQueryMap {
+ public:
+
+  // This function takes a lambda which should take a parameter of object 'T' and
+  // runs the lambda for all the entries in the map. The lambda should have a return
+  // type of 'void'..
+  // TODO: If necessary, refactor the lambda signature to allow returning Status objects.
+  void DoFuncForAllEntries(const std::function<void(const T&)>& call) {
+    for (int i = 0; i < NUM_QUERY_BUCKETS; ++i) {
+      boost::lock_guard<SpinLock> l(shards_[i].map_lock_);
+      for (const auto& map_value_ref: shards_[i].map_) {
+        call(map_value_ref.second);
+      }
+    }
+  }
+
+ private:
+  template <typename T2>
+  friend class ScopedShardedMapRef;
+
+  // Number of buckets to split the containers of query IDs into.
+  static constexpr uint32_t NUM_QUERY_BUCKETS = 4;
+
+  // We group the map and its corresponding lock together to avoid false sharing. Since
+  // we will always access a map and its corresponding lock together, it's better if
+  // they can be allocated on the same cache line.
+  struct MapShard : public CacheLineAligned {
+    std::unordered_map<TUniqueId, T> map_;
+    SpinLock map_lock_;
+  };
+  struct MapShard shards_[NUM_QUERY_BUCKETS];
+};
+
+/// Use this class to obtain a locked reference to the underlying map shard
+/// of a ShardedQueryMap, corresponding to the 'query_id'.
+//
+/// Pattern:
+/// {
+///   ScopedShardedMapRef map_ref(qid, sharded_map);
+///   DCHECK(map_ref != nullptr);  <nullptr should never be returned>
+///   ...
+/// }
+//
+/// The caller should ensure that the lifetime of the ShardedQueryMap should be longer
+/// than the lifetime of this scoped class.
+template <typename T>
+class ScopedShardedMapRef {
+ public:
+
+  // Finds the appropriate map that could/should contain 'query_id' and locks it.
+  ScopedShardedMapRef(
+      const TUniqueId& query_id, class ShardedQueryMap<T>* sharded_map) {
+    DCHECK(sharded_map != nullptr);
+    int qs_map_bucket = QueryIdToBucket(query_id);
+    shard_ = &sharded_map->shards_[qs_map_bucket];
+
+    // Lock the corresponding shard.
+    shard_->map_lock_.lock();
+  }
+
+  ~ScopedShardedMapRef() {
+    shard_->map_lock_.DCheckLocked();
+    shard_->map_lock_.unlock();
+  }
+
+  // Returns the shard (map) for the 'query_id' passed to the constructor.
+  // Should never return nullptr.
+  std::unordered_map<TUniqueId, T>* get() {
+    shard_->map_lock_.DCheckLocked();
+    return &shard_->map_;
+  }
+
+  std::unordered_map<TUniqueId, T>* operator->() {
+    shard_->map_lock_.DCheckLocked();
+    return get();
+  }
+
+ private:
+
+  // Return the correct bucket that a query ID would belong to.
+  inline int QueryIdToBucket(const TUniqueId& query_id) {
+    int bucket =
+        static_cast<int>(query_id.hi) % ShardedQueryMap<T>::NUM_QUERY_BUCKETS;
+    DCHECK(bucket < ShardedQueryMap<T>::NUM_QUERY_BUCKETS && bucket >= 0);
+    return bucket;
+  }
+
+  typename ShardedQueryMap<T>::MapShard* shard_;
+  DISALLOW_COPY_AND_ASSIGN(ScopedShardedMapRef);
+};
+
+} // namespace impala
+
+#endif /* SHARDED_QUERY_MAP_UTIL_H */