You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/15 06:45:48 UTC

[1/2] impala git commit: IMPALA-6519: API to allocate unreserved buffer

Repository: impala
Updated Branches:
  refs/heads/master b0d3433e3 -> 19ab465b3


IMPALA-6519: API to allocate unreserved buffer

The motivation is to allow allocation of buffers without reservation in
ExchangeNode. Currently this it not possible because
IncreaseReservationToFit() followed by AllocateBuffer() is non-atomic.
We need to handle concurrent allocations in ExchangeNode because there
may be multiple batches being received at a given time.

This is a temporary solution until we can implement proper reservations
in ExchangeNode (IMPALA-6524).

Testing:
Added basic unit test.

Change-Id: Ia4d17b3db25491f796484de22405fbdee7a0f983
Reviewed-on: http://gerrit.cloudera.org:8080/9250
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/cce0b2de
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/cce0b2de
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/cce0b2de

Branch: refs/heads/master
Commit: cce0b2de28e81ffd7a9622e3e5eba7a2081a7fba
Parents: b0d3433
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Feb 7 17:39:11 2018 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 15 01:35:34 2018 +0000

----------------------------------------------------------------------
 .../runtime/bufferpool/buffer-pool-internal.h   | 18 +++++--
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 50 ++++++++++++-----
 be/src/runtime/bufferpool/buffer-pool.cc        | 57 +++++++++++++++-----
 be/src/runtime/bufferpool/buffer-pool.h         | 15 ++++++
 .../runtime/bufferpool/reservation-tracker.cc   | 12 +++++
 be/src/runtime/bufferpool/reservation-tracker.h | 12 ++++-
 6 files changed, 131 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/cce0b2de/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 7094942..dee8e4f 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -238,11 +238,19 @@ class BufferPool::Client {
   /// page->pin_in_flight was set to true by StartMoveToPinned().
   Status FinishMoveEvictedToPinned(Page* page) WARN_UNUSED_RESULT;
 
-  /// Must be called once before allocating a buffer of 'len' via the AllocateBuffer()
-  /// API to deduct from the client's reservation and update internal accounting. Cleans
-  /// dirty pages if needed to satisfy the buffer pool's internal invariants. No page or
-  /// client locks should be held by the caller.
-  Status PrepareToAllocateBuffer(int64_t len) WARN_UNUSED_RESULT;
+  /// Must be called once before allocating a buffer of 'len' via the AllocateBuffer() or
+  /// AllocateUnreservedBuffer() APIs. Deducts from the client's reservation and updates
+  /// internal accounting. Cleans dirty pages if needed to satisfy the buffer pool's
+  /// internal invariants. No page or client locks should be held by the caller.
+  /// If 'reserved' is true, we assume that the memory is already reserved. If it is
+  /// false, tries to increase the reservation if needed.
+  ///
+  /// On success, returns OK and sets 'success' to true if non-NULL. If an error is
+  /// encountered, e.g. while cleaning pages, returns an error status. If the reservation
+  /// could not be increased for an unreserved allocation, returns OK and sets 'success'
+  /// to false (for unreserved allocations, 'success' must be non-NULL).
+  Status PrepareToAllocateBuffer(
+      int64_t len, bool reserved, bool* success) WARN_UNUSED_RESULT;
 
   /// Implementation of ClientHandle::DecreaseReservationTo().
   Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT;

http://git-wip-us.apache.org/repos/asf/impala/blob/cce0b2de/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 0138a08..d6547d2 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -367,6 +367,7 @@ class BufferPoolTest : public ::testing::Test {
   }
 
   /// Parameterised test implementations.
+  void TestBufferAllocation(bool reserved);
   void TestMemoryReclamation(BufferPool* pool, int src_core, int dst_core);
   void TestEvictionPolicy(int64_t page_size);
   void TestCleanPageLimit(int max_clean_pages, bool randomize_core);
@@ -550,27 +551,43 @@ TEST_F(BufferPoolTest, PageCreation) {
   global_reservations_.Close();
 }
 
-TEST_F(BufferPoolTest, BufferAllocation) {
+TEST_F(BufferPoolTest, ReservedBufferAllocation) {
+  TestBufferAllocation(true);
+}
+
+TEST_F(BufferPoolTest, UnreservedBufferAllocation) {
+  TestBufferAllocation(false);
+}
+
+void BufferPoolTest::TestBufferAllocation(bool reserved) {
   // Allocate many buffers, each a power-of-two multiple of the minimum buffer length.
-  int num_buffers = 16;
-  int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1);
-  int64_t total_mem = 2 * 2 * max_buffer_len;
-  global_reservations_.InitRootTracker(NULL, total_mem);
-  BufferPool pool(TEST_BUFFER_LEN, total_mem, total_mem);
+  const int NUM_BUFFERS = 16;
+  const int64_t MAX_BUFFER_LEN = TEST_BUFFER_LEN << (NUM_BUFFERS - 1);
+
+  // Total memory required to allocate TEST_BUFFER_LEN, 2*TEST_BUFFER_LEN, ...,
+  // MAX_BUFFER_LEN.
+  const int64_t TOTAL_MEM = 2 * MAX_BUFFER_LEN - TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM, TOTAL_MEM);
   BufferPool::ClientHandle client;
   ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
-      total_mem, NewProfile(), &client));
-  ASSERT_TRUE(client.IncreaseReservationToFit(total_mem));
+      TOTAL_MEM, NewProfile(), &client));
+  if (reserved) ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM));
 
-  vector<BufferPool::BufferHandle> handles(num_buffers);
+  vector<BufferPool::BufferHandle> handles(NUM_BUFFERS);
 
   // Create buffers of various valid sizes.
   int64_t total_allocated = 0;
-  for (int i = 0; i < num_buffers; ++i) {
+  for (int i = 0; i < NUM_BUFFERS; ++i) {
     int size_multiple = 1 << i;
     int64_t buffer_len = TEST_BUFFER_LEN * size_multiple;
     int64_t used_before = client.GetUsedReservation();
-    ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
+    if (reserved) {
+      ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
+    } else {
+      // Reservation should be automatically increased.
+      ASSERT_OK(pool.AllocateUnreservedBuffer(&client, buffer_len, &handles[i]));
+    }
     total_allocated += buffer_len;
     ASSERT_TRUE(handles[i].is_open());
     ASSERT_TRUE(handles[i].data() != NULL);
@@ -583,8 +600,15 @@ TEST_F(BufferPoolTest, BufferAllocation) {
     EXPECT_EQ(0, pool.GetFreeBufferBytes());
   }
 
+  if (!reserved) {
+    // Allocate all of the memory and test the failure path for unreserved allocations.
+    BufferPool::BufferHandle tmp_handle;
+    ASSERT_OK(pool.AllocateUnreservedBuffer(&client, TEST_BUFFER_LEN, &tmp_handle));
+    ASSERT_FALSE(tmp_handle.is_open()) << "No reservation for buffer";
+  }
+
   // Close the handles and check memory consumption.
-  for (int i = 0; i < num_buffers; ++i) {
+  for (int i = 0; i < NUM_BUFFERS; ++i) {
     int64_t used_before = client.GetUsedReservation();
     int buffer_len = handles[i].len();
     pool.FreeBuffer(&client, &handles[i]);
@@ -597,7 +621,7 @@ TEST_F(BufferPoolTest, BufferAllocation) {
   ASSERT_EQ(global_reservations_.GetReservation(), 0);
   // But freed memory is not released to the system immediately.
   EXPECT_EQ(total_allocated, pool.GetSystemBytesAllocated());
-  EXPECT_EQ(num_buffers, pool.GetNumFreeBuffers());
+  EXPECT_EQ(NUM_BUFFERS, pool.GetNumFreeBuffers());
   EXPECT_EQ(total_allocated, pool.GetFreeBufferBytes());
   global_reservations_.Close();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/cce0b2de/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 6a111ff..2d06f7b 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -226,12 +226,23 @@ Status BufferPool::ExtractBuffer(
 
 Status BufferPool::AllocateBuffer(
     ClientHandle* client, int64_t len, BufferHandle* handle) {
-  RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len));
+  RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len, true, nullptr));
   Status status = allocator_->Allocate(client, len, handle);
-  if (!status.ok()) {
-    // Allocation failed - update client's accounting to reflect the failure.
-    client->impl_->FreedBuffer(len);
-  }
+  // If the allocation failed, update client's accounting to reflect the failure.
+  if (!status.ok()) client->impl_->FreedBuffer(len);
+  return status;
+}
+
+Status BufferPool::AllocateUnreservedBuffer(
+    ClientHandle* client, int64_t len, BufferHandle* handle) {
+  DCHECK(!handle->is_open());
+  bool success;
+  RETURN_IF_ERROR(client->impl_->PrepareToAllocateBuffer(len, false, &success));
+  if (!success) return Status::OK(); // Leave 'handle' closed to indicate failure.
+
+  Status status = allocator_->Allocate(client, len, handle);
+  // If the allocation failed, update client's accounting to reflect the failure.
+  if (!status.ok()) client->impl_->FreedBuffer(len);
   return status;
 }
 
@@ -546,14 +557,34 @@ Status BufferPool::Client::FinishMoveEvictedToPinned(Page* page) {
   return Status::OK();
 }
 
-Status BufferPool::Client::PrepareToAllocateBuffer(int64_t len) {
-  unique_lock<mutex> lock(lock_);
-  // Clean enough pages to allow allocation to proceed without violating our eviction
-  // policy. This can fail, so only update the accounting once success is ensured.
-  RETURN_IF_ERROR(CleanPages(&lock, len));
-  reservation_.AllocateFrom(len);
-  buffers_allocated_bytes_ += len;
-  DCHECK_CONSISTENCY();
+Status BufferPool::Client::PrepareToAllocateBuffer(
+    int64_t len, bool reserved, bool* success) {
+  if (success != nullptr) *success = false;
+  // Don't need to hold the client's 'lock_' yet because 'reservation_' operations are
+  // threadsafe.
+  if (reserved) {
+    // The client must have already reserved the memory.
+    reservation_.AllocateFrom(len);
+  } else {
+    DCHECK(success != nullptr);
+    // The client may not have reserved the memory.
+    if (!reservation_.IncreaseReservationToFitAndAllocate(len)) return Status::OK();
+  }
+
+  {
+    unique_lock<mutex> lock(lock_);
+    // Clean enough pages to allow allocation to proceed without violating our eviction
+    // policy.
+    Status status = CleanPages(&lock, len);
+    if (!status.ok()) {
+      // Reverse the allocation.
+      reservation_.ReleaseTo(len);
+      return status;
+    }
+    buffers_allocated_bytes_ += len;
+    DCHECK_CONSISTENCY();
+  }
+  if (success != nullptr) *success = true;
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/cce0b2de/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 5b98579..285aacb 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -239,6 +239,21 @@ class BufferPool : public CacheLineAligned {
   Status AllocateBuffer(
       ClientHandle* client, int64_t len, BufferHandle* handle) WARN_UNUSED_RESULT;
 
+  /// Like AllocateBuffer(), except used when the client may not have the reservation
+  /// to allocate the buffer. Tries to increase reservation on the behalf of the client
+  /// if needed to allocate the buffer. If the reservation isn't available, 'handle'
+  /// isn't opened and OK is returned. If an unexpected error occurs, an error is
+  /// returned and any reservation increase remains in effect. Safe to call concurrently
+  /// with any other operations for 'client', except for operations on the same 'handle'.
+  ///
+  /// This function is a transitional mechanism for components to allocate memory from
+  /// the buffer pool without implementing the reservation accounting required to operate
+  /// within a predetermined memory constraint. Wherever possible, clients should reserve
+  /// memory ahead of time and allocate out of that instead of relying on this "best
+  /// effort" interface.
+  Status AllocateUnreservedBuffer(
+      ClientHandle* client, int64_t len, BufferHandle* handle) WARN_UNUSED_RESULT;
+
   /// If 'handle' is open, close 'handle', free the buffer and decrease the reservation
   /// usage from 'client'. Idempotent. Safe to call concurrently with other operations
   /// for 'client', except for operations on the same 'handle'.

http://git-wip-us.apache.org/repos/asf/impala/blob/cce0b2de/be/src/runtime/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc b/be/src/runtime/bufferpool/reservation-tracker.cc
index aba5dce..f0e1839 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -141,6 +141,14 @@ bool ReservationTracker::IncreaseReservationToFit(int64_t bytes, Status* error_s
   return IncreaseReservationInternalLocked(bytes, true, false, error_status);
 }
 
+bool ReservationTracker::IncreaseReservationToFitAndAllocate(
+    int64_t bytes, Status* error_status) {
+  lock_guard<SpinLock> l(lock_);
+  if (!IncreaseReservationInternalLocked(bytes, true, false, error_status)) return false;
+  AllocateFromLocked(bytes);
+  return true;
+}
+
 bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
     bool use_existing_reservation, bool is_child_reservation, Status* error_status) {
   DCHECK(initialized_);
@@ -359,6 +367,10 @@ vector<ReservationTracker*> ReservationTracker::FindPathToRoot() {
 
 void ReservationTracker::AllocateFrom(int64_t bytes) {
   lock_guard<SpinLock> l(lock_);
+  AllocateFromLocked(bytes);
+}
+
+void ReservationTracker::AllocateFromLocked(int64_t bytes) {
   DCHECK(initialized_);
   DCHECK_GE(bytes, 0);
   DCHECK_LE(bytes, unused_reservation());

http://git-wip-us.apache.org/repos/asf/impala/blob/cce0b2de/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h
index ff4b77e..3bf2de1 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -129,8 +129,13 @@ class ReservationTracker {
   /// Returns true if the reservation increase was successful or not necessary. Otherwise
   /// returns false and if 'error_status' is non-null, it returns an appropriate status
   /// message in it.
-  bool IncreaseReservationToFit(int64_t bytes, Status* error_status = nullptr)
-      WARN_UNUSED_RESULT;
+  bool IncreaseReservationToFit(
+      int64_t bytes, Status* error_status = nullptr) WARN_UNUSED_RESULT;
+
+  /// Like IncreaseReservationToFit(), except 'bytes' is also allocated from
+  /// the reservation on success.
+  bool IncreaseReservationToFitAndAllocate(
+      int64_t bytes, Status* error_status = nullptr) WARN_UNUSED_RESULT;
 
   /// Decrease reservation by 'bytes' on this tracker and all ancestors. This tracker's
   /// reservation must be at least 'bytes' before calling this method.
@@ -247,6 +252,9 @@ class ReservationTracker {
   /// 'lock_' must be held by caller.
   void CheckConsistency() const;
 
+  /// Same as AllocateFrom() except 'lock_' must be held by caller.
+  void AllocateFromLocked(int64_t bytes);
+
   /// Increase or decrease 'used_reservation_' and update profile counters accordingly.
   /// 'lock_' must be held by caller.
   void UpdateUsedReservation(int64_t delta);


[2/2] impala git commit: IMPALA-4456: Address scalability issues of qs_map_lock_ and client_request_state_map_lock_

Posted by ta...@apache.org.
IMPALA-4456: Address scalability issues of qs_map_lock_ and client_request_state_map_lock_

The following 2 locks have shown to be frequent points of contention
on recent perf runs:

- qs_map_lock_
- client_request_state_map_lock_

Since these are process wide locks, any threads waiting on these locks
potentially slow down the runtime of a query.

I tried to address this previously by converting the client_request_state_map_lock_
to a reader-writer lock. This showed great perf improvements in the general
case, however, there were edge cases with big regressions as well.
In the general case, strict readers of the map got through so quickly
that we were able to see a reduction in the number of client connections
created, since this lock was contended for in the context of Thrift threads too.
The bad case is when writers were starved trying to register a new query
since there were so many readers. Changing the starve option resulted in
worse read performance all round.

Another approach which is relatively simpler is to shard the locks, which
proves to be very effective with no regressions. The maps and locks are
sharded to a default of 4 buckets initally.

Query IDs are created by using boost::uuids::random_generator. We use the
high bits of a query ID to assign queries to buckets. I verified that the
distribution of the high bits of a query ID are even across buckets on
my local machine:

For 10,000 queries sharded across 4 buckets, the distribution was:
bucket[0]: 2500
bucket[1]: 2489
bucket[2]: 2566
bucket[3]: 2445

A micro-benchmark is added to measure the improvement in performance. This
benchmark creates multiple threads each of which creates a QueryState and
accesses it multiple times. We can see improvements in the range 2x - 3.5x.

BEFORE:
------------------Benchmark 1: Create and access Query States.
Total Time (#Queries: 5 #Accesses: 100) : 1ms
Total Time (#Queries: 50 #Accesses: 100) : 8ms
Total Time (#Queries: 50 #Accesses: 1000) : 54ms
Total Time (#Queries: 500 #Accesses: 100) : 76ms
Total Time (#Queries: 500 #Accesses: 1000) : 543ms

AFTER:
------------------Benchmark 1: Create and access Query States.
Total Time (#Queries: 5 #Accesses: 100) : 2173.59K clock cycles
Total Time (#Queries: 50 #Accesses: 100) : 4ms
Total Time (#Queries: 50 #Accesses: 1000) : 15ms
Total Time (#Queries: 500 #Accesses: 100) : 46ms
Total Time (#Queries: 500 #Accesses: 1000) : 151ms

This change introduces a ShardedQueryMap, which is used to replace
the QueryExecMgr::qs_map_ and the ImpalaServer::client_request_state_map_,
and their corresponding locks, thereby abstracting away the access to the
maps locks.

For operations that need to happen on every entry in the ShardedQueryMap
maps, a new function ShardedQueryMap::DoFuncForAllEntries() is
introduced which takes a user supplied lambda and passes it every individual
map entry and executes it.

NOTE: This microbenchmark has shown that SpinLock has better performance
than boost::mutex for the qs_map_lock_'s, so that change has been made
too.

TODO: Add benchmark for client_request_state_map_lock_ too. The APIs
around that are more complicated, so this patch only includes
the benchmarking of qs_map_lock_.

TODO 2: Consider adopting the ShardedQueryMapTemplate for the SessionStateMap.

Change-Id: I61089090e1095da45a8a64ed3ccc78bd310807f1
Reviewed-on: http://gerrit.cloudera.org:8080/8363
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/19ab465b
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/19ab465b
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/19ab465b

Branch: refs/heads/master
Commit: 19ab465b391167e99658c9270a2a3d76b2b2652f
Parents: cce0b2d
Author: Sailesh Mukil <sa...@apache.org>
Authored: Fri Oct 6 16:27:47 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 15 05:04:50 2018 +0000

----------------------------------------------------------------------
 be/src/benchmarks/CMakeLists.txt                |   1 +
 .../benchmarks/process-wide-locks-benchmark.cc  | 174 +++++++++++++++++++
 be/src/runtime/query-exec-mgr.cc                |  32 ++--
 be/src/runtime/query-exec-mgr.h                 |  10 +-
 be/src/service/impala-http-handler.cc           |  23 ++-
 be/src/service/impala-server.cc                 |  34 ++--
 be/src/service/impala-server.h                  |  29 ++--
 be/src/util/sharded-query-map-util.h            | 135 ++++++++++++++
 8 files changed, 379 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/benchmarks/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/CMakeLists.txt b/be/src/benchmarks/CMakeLists.txt
index a569a66..02fbaad 100644
--- a/be/src/benchmarks/CMakeLists.txt
+++ b/be/src/benchmarks/CMakeLists.txt
@@ -48,6 +48,7 @@ ADD_BE_BENCHMARK(multiint-benchmark)
 ADD_BE_BENCHMARK(network-perf-benchmark)
 ADD_BE_BENCHMARK(overflow-benchmark)
 ADD_BE_BENCHMARK(parse-timestamp-benchmark)
+ADD_BE_BENCHMARK(process-wide-locks-benchmark)
 ADD_BE_BENCHMARK(row-batch-serialize-benchmark)
 ADD_BE_BENCHMARK(scheduler-benchmark)
 ADD_BE_BENCHMARK(status-benchmark)

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/benchmarks/process-wide-locks-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/process-wide-locks-benchmark.cc b/be/src/benchmarks/process-wide-locks-benchmark.cc
new file mode 100644
index 0000000..ffe4268
--- /dev/null
+++ b/be/src/benchmarks/process-wide-locks-benchmark.cc
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <iostream>
+
+#include <boost/bind.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/uuid/uuid.hpp>
+#include <boost/uuid/uuid_generators.hpp>
+#include <boost/uuid/uuid_io.hpp>
+
+#include "runtime/query-exec-mgr.h"
+#include "runtime/query-state.h"
+#include "runtime/test-env.h"
+#include "scheduling/request-pool-service.h"
+#include "service/fe-support.h"
+#include "util/benchmark.h"
+#include "util/cpu-info.h"
+#include "util/debug-util.h"
+#include "util/metrics.h"
+#include "util/pretty-printer.h"
+#include "util/stopwatch.h"
+#include "util/thread.h"
+#include "util/uid-util.h"
+
+#include "common/init.h"
+#include "common/names.h"
+
+/// This tests the performance of the following process wide locks:
+//
+/// 1. qs_map_lock_ (Sharded)
+/// TODO: client_request_state_map_lock_ (Sharded)
+//
+/// A reasonable amount of queries are created and accessed multiple times via the
+/// QueryExecMgr's APIs to benchmark the time taken to acquire the lock and retrieve
+/// the QueryState.
+//
+/// ------------------Benchmark 1: Create and access Query States.
+/// Total Time (#Queries: 5 #Accesses: 100) : 2202.44K clock cycles
+/// Total Time (#Queries: 50 #Accesses: 100) : 4ms
+/// Total Time (#Queries: 50 #Accesses: 1000) : 16ms
+/// Total Time (#Queries: 500 #Accesses: 100) : 46ms
+/// Total Time (#Queries: 500 #Accesses: 1000) : 129ms
+/// Total Time (#Queries: 500 #Accesses: 5000) : 518ms
+/// Total Time (#Queries: 1000 #Accesses: 1000) : 246ms
+/// Total Time (#Queries: 1000 #Accesses: 5000) : 1s018ms
+//
+/// This was created to test improvements for IMPALA-4456.
+
+using boost::uuids::random_generator;
+
+using namespace impala;
+
+boost::scoped_ptr<TestEnv> test_env_;
+vector<TUniqueId> query_ids;
+
+// This function creates a QueryState and accesses it 'num_accesses' times, via the
+// QueryExecMgr APIs.
+// TODO: Add a similar funciton for ClientRequestStates.
+void CreateAndAccessQueryStates(const TUniqueId& query_id, int num_accesses) {
+  TQueryCtx query_ctx;
+  query_ctx.query_id = query_id;
+
+  string resolved_pool;
+  Status s = ExecEnv::GetInstance()->request_pool_service()->ResolveRequestPool(
+      query_ctx, &resolved_pool);
+
+  query_ctx.__set_request_pool(resolved_pool);
+
+  QueryState *query_state;
+  query_state = ExecEnv::GetInstance()->query_exec_mgr()->CreateQueryState(query_ctx);
+  DCHECK(query_state != nullptr);
+  query_state->AcquireExecResourceRefcount();
+
+  for (int i=0; i < num_accesses ; ++i) {
+    QueryState* qs;
+    qs = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id);
+    DCHECK(qs != nullptr);
+    ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(qs);
+  }
+
+  query_state->ReleaseExecResourceRefcount();
+  // This should drop the last reference count to the QueryState and destroy it.
+  ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(query_state);
+  // Make sure that the query doesn't exist in the map any longer.
+  DCHECK(ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id) == nullptr);
+
+}
+
+// Runs 'num_threads' Impala Threads and have each of them execute func().
+void ImpalaThreadStarter(void (*func) (const TUniqueId&, int), int num_threads,
+    int func_arg) {
+  vector<unique_ptr<Thread>> threads;
+  threads.reserve(num_threads);
+
+  for (int i=0; i < num_threads; ++i) {
+    unique_ptr<Thread> thread;
+    function<void ()> f =
+        bind(func, query_ids[i], func_arg);
+    Status s =
+        Thread::Create("mythreadgroup", "thread", f, &thread);
+    DCHECK(s.ok());
+    threads.push_back(move(thread));
+  }
+  for (unique_ptr<Thread>& thread: threads) {
+    thread->Join();
+  }
+}
+
+void RunBenchmark(int num_queries, int num_accesses) {
+  StopWatch total_time;
+  total_time.Start();
+  ImpalaThreadStarter(CreateAndAccessQueryStates, num_queries, num_accesses);
+  total_time.Stop();
+
+  cout << "Total Time " << "(#Queries: " << num_queries << " #Accesses: "
+       << num_accesses << ") : "
+       << PrettyPrinter::Print(total_time.ElapsedTime(), TUnit::CPU_TICKS) << endl;
+}
+
+// Create and store 'num_queries' Query IDs into 'query_ids'.
+void CreateQueryIds(int num_queries) {
+  for (int i=0; i < num_queries; ++i) {
+    query_ids[i] = UuidToQueryId(random_generator()());
+  }
+}
+
+int main(int argc, char **argv) {
+  // Though we don't use the JVM or require FeSupport, the TestEnv class requires it,
+  // so we start them up.
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
+
+  const int MAX_QUERIES = 1000;
+
+  query_ids.reserve(MAX_QUERIES);
+
+  test_env_.reset(new TestEnv());
+  ABORT_IF_ERROR(test_env_->Init());
+
+  CreateQueryIds(MAX_QUERIES);
+
+  cout << "------------------Benchmark 1: Create and access Query States." << endl;
+  RunBenchmark(5, 100);
+  RunBenchmark(50, 100);
+  RunBenchmark(50, 1000);
+  RunBenchmark(500, 100);
+  RunBenchmark(500, 1000);
+  RunBenchmark(500, 5000);
+  RunBenchmark(1000, 1000);
+  RunBenchmark(1000, 5000);
+
+  cout << endl;
+
+  // TODO: Benchmark lock of ClientRequestStates too.
+
+  return 0;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/runtime/query-exec-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc
index 316b712..967dc4b 100644
--- a/be/src/runtime/query-exec-mgr.cc
+++ b/be/src/runtime/query-exec-mgr.cc
@@ -82,9 +82,12 @@ QueryState* QueryExecMgr::GetQueryState(const TUniqueId& query_id) {
   QueryState* qs = nullptr;
   int refcnt;
   {
-    lock_guard<mutex> l(qs_map_lock_);
-    auto it = qs_map_.find(query_id);
-    if (it == qs_map_.end()) return nullptr;
+    ScopedShardedMapRef<QueryState*> map_ref(query_id,
+        &ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto it = map_ref->find(query_id);
+    if (it == map_ref->end()) return nullptr;
     qs = it->second;
     refcnt = qs->refcnt_.Add(1);
   }
@@ -98,12 +101,15 @@ QueryState* QueryExecMgr::GetOrCreateQueryState(
   QueryState* qs = nullptr;
   int refcnt;
   {
-    lock_guard<mutex> l(qs_map_lock_);
-    auto it = qs_map_.find(query_ctx.query_id);
-    if (it == qs_map_.end()) {
+    ScopedShardedMapRef<QueryState*> map_ref(query_ctx.query_id,
+        &ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto it = map_ref->find(query_ctx.query_id);
+    if (it == map_ref->end()) {
       // register new QueryState
       qs = new QueryState(query_ctx);
-      qs_map_.insert(make_pair(query_ctx.query_id, qs));
+      map_ref->insert(make_pair(query_ctx.query_id, qs));
       *created = true;
     } else {
       qs = it->second;
@@ -153,18 +159,20 @@ void QueryExecMgr::ReleaseQueryState(QueryState* qs) {
 
   QueryState* qs_from_map = nullptr;
   {
-    // for now, gc right away
-    lock_guard<mutex> l(qs_map_lock_);
-    auto it = qs_map_.find(query_id);
+    ScopedShardedMapRef<QueryState*> map_ref(query_id,
+        &ExecEnv::GetInstance()->query_exec_mgr()->qs_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto it = map_ref->find(query_id);
     // someone else might have gc'd the entry
-    if (it == qs_map_.end()) return;
+    if (it == map_ref->end()) return;
     qs_from_map = it->second;
     DCHECK_EQ(qs_from_map->query_ctx().query_id, query_id);
     int32_t cnt = qs_from_map->refcnt_.Load();
     DCHECK_GE(cnt, 0);
     // someone else might have increased the refcnt in the meantime
     if (cnt > 0) return;
-    qs_map_.erase(it);
+    map_ref->erase(it);
   }
   // TODO: send final status report during gc, but do this from a different thread
   delete qs_from_map;

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/runtime/query-exec-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-exec-mgr.h b/be/src/runtime/query-exec-mgr.h
index 8a0c884..bddd731 100644
--- a/be/src/runtime/query-exec-mgr.h
+++ b/be/src/runtime/query-exec-mgr.h
@@ -23,8 +23,8 @@
 #include <unordered_map>
 
 #include "common/status.h"
-#include "util/uid-util.h"
 #include "gen-cpp/Types_types.h"
+#include "util/sharded-query-map-util.h"
 
 namespace impala {
 
@@ -39,7 +39,7 @@ class FragmentInstanceState;
 /// entry point for gaining refcounted access to a QueryState. It also initiates
 /// query execution.
 /// Thread-safe.
-class QueryExecMgr {
+class QueryExecMgr : public CacheLineAligned {
  public:
   /// Creates QueryState if it doesn't exist and initiates execution of all fragment
   /// instance for this query. All fragment instances hold a reference to their
@@ -64,11 +64,9 @@ class QueryExecMgr {
   void ReleaseQueryState(QueryState* qs);
 
  private:
-  /// protects qs_map_
-  boost::mutex qs_map_lock_;
 
-  /// map from query id to QueryState (owned by us)
-  std::unordered_map<TUniqueId, QueryState*> qs_map_;
+  typedef ShardedQueryMap<QueryState*> QueryStateMap;
+  QueryStateMap qs_map_;
 
   /// Gets the existing QueryState or creates a new one if not present.
   /// 'created' is set to true if it was created, false otherwise.

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/service/impala-http-handler.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-http-handler.cc b/be/src/service/impala-http-handler.cc
index b633f2a..0156023 100644
--- a/be/src/service/impala-http-handler.cc
+++ b/be/src/service/impala-http-handler.cc
@@ -37,6 +37,7 @@
 #include "util/redactor.h"
 #include "util/summary-util.h"
 #include "util/time.h"
+#include "util/uid-util.h"
 #include "util/webserver.h"
 
 #include "common/names.h"
@@ -243,12 +244,11 @@ void ImpalaHttpHandler::QueryProfileEncodedHandler(const Webserver::ArgumentMap&
 
 void ImpalaHttpHandler::InflightQueryIdsHandler(const Webserver::ArgumentMap& args,
     Document* document) {
-  lock_guard<mutex> l(server_->client_request_state_map_lock_);
   stringstream ss;
-  for (const ImpalaServer::ClientRequestStateMap::value_type& request_state:
-       server_->client_request_state_map_) {
-    ss << request_state.second->query_id() << "\n";
-  }
+  server_->client_request_state_map_.DoFuncForAllEntries(
+      [&](const std::shared_ptr<ClientRequestState>& request_state) {
+          ss << request_state->query_id() << "\n";
+      });
   document->AddMember(Webserver::ENABLE_RAW_JSON_KEY, true, document->GetAllocator());
   Value query_ids(ss.str().c_str(), document->GetAllocator());
   document->AddMember("contents", query_ids, document->GetAllocator());
@@ -366,14 +366,11 @@ void ImpalaHttpHandler::QueryStateHandler(const Webserver::ArgumentMap& args,
     Document* document) {
   set<ImpalaServer::QueryStateRecord, ImpalaServer::QueryStateRecordLessThan>
       sorted_query_records;
-  {
-    lock_guard<mutex> l(server_->client_request_state_map_lock_);
-    for (const ImpalaServer::ClientRequestStateMap::value_type& request_state:
-         server_->client_request_state_map_) {
-      // TODO: Do this in the browser so that sorts on other keys are possible.
-      sorted_query_records.insert(ImpalaServer::QueryStateRecord(*request_state.second));
-    }
-  }
+
+  server_->client_request_state_map_.DoFuncForAllEntries(
+      [&](const std::shared_ptr<ClientRequestState>& request_state) {
+          sorted_query_records.insert(ImpalaServer::QueryStateRecord(*request_state));
+      });
 
   Value in_flight_queries(kArrayType);
   int64_t num_waiting_queries = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 3866e40..c43c850 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -921,6 +921,7 @@ void ImpalaServer::PrepareQueryContext(TQueryCtx* query_ctx) {
   // single generator under a lock (since random_generator is not
   // thread-safe).
   query_ctx->query_id = UuidToQueryId(random_generator()());
+
 }
 
 Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
@@ -933,16 +934,19 @@ Status ImpalaServer::RegisterQuery(shared_ptr<SessionState> session_state,
   if (session_state->closed) return Status("Session has been closed, ignoring query.");
   const TUniqueId& query_id = request_state->query_id();
   {
-    lock_guard<mutex> l(client_request_state_map_lock_);
-    ClientRequestStateMap::iterator entry = client_request_state_map_.find(query_id);
-    if (entry != client_request_state_map_.end()) {
+    ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id,
+        &ExecEnv::GetInstance()->impala_server()->client_request_state_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto entry = map_ref->find(query_id);
+    if (entry != map_ref->end()) {
       // There shouldn't be an active query with that same id.
       // (query_id is globally unique)
       stringstream ss;
       ss << "query id " << PrintId(query_id) << " already exists";
       return Status(ErrorMsg(TErrorCode::INTERNAL_ERROR, ss.str()));
     }
-    client_request_state_map_.insert(make_pair(query_id, request_state));
+    map_ref->insert(make_pair(query_id, request_state));
   }
   // Metric is decremented in UnregisterQuery().
   ImpaladMetrics::NUM_QUERIES_REGISTERED->Increment(1L);
@@ -989,14 +993,17 @@ Status ImpalaServer::UnregisterQuery(const TUniqueId& query_id, bool check_infli
 
   shared_ptr<ClientRequestState> request_state;
   {
-    lock_guard<mutex> l(client_request_state_map_lock_);
-    ClientRequestStateMap::iterator entry = client_request_state_map_.find(query_id);
-    if (entry == client_request_state_map_.end()) {
+    ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id,
+        &ExecEnv::GetInstance()->impala_server()->client_request_state_map_);
+    DCHECK(map_ref.get() != nullptr);
+
+    auto entry = map_ref->find(query_id);
+    if (entry == map_ref->end()) {
       return Status("Invalid or unknown query handle");
     } else {
       request_state = entry->second;
     }
-    client_request_state_map_.erase(entry);
+    map_ref->erase(entry);
   }
 
   request_state->Done();
@@ -2046,12 +2053,15 @@ void ImpalaServer::Join() {
 
 shared_ptr<ClientRequestState> ImpalaServer::GetClientRequestState(
     const TUniqueId& query_id) {
-  lock_guard<mutex> l(client_request_state_map_lock_);
-  ClientRequestStateMap::iterator i = client_request_state_map_.find(query_id);
-  if (i == client_request_state_map_.end()) {
+  ScopedShardedMapRef<std::shared_ptr<ClientRequestState>> map_ref(query_id,
+      &ExecEnv::GetInstance()->impala_server()->client_request_state_map_);
+  DCHECK(map_ref.get() != nullptr);
+
+  auto entry = map_ref->find(query_id);
+  if (entry == map_ref->end()) {
     return shared_ptr<ClientRequestState>();
   } else {
-    return i->second;
+    return entry->second;
   }
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/service/impala-server.h
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 237b0cb..abe8694 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -22,11 +22,11 @@
 #include <boost/thread/mutex.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/scoped_ptr.hpp>
-#include <boost/unordered_map.hpp>
 #include <boost/unordered_set.hpp>
 #include <boost/uuid/uuid.hpp>
 #include <boost/uuid/uuid_generators.hpp>
 #include <boost/uuid/uuid_io.hpp>
+#include <unordered_map>
 
 #include "gen-cpp/ImpalaService.h"
 #include "gen-cpp/ImpalaHiveServer2Service.h"
@@ -39,10 +39,10 @@
 #include "util/condition-variable.h"
 #include "util/metrics.h"
 #include "util/runtime-profile.h"
+#include "util/sharded-query-map-util.h"
 #include "util/simple-logger.h"
 #include "util/thread-pool.h"
 #include "util/time.h"
-#include "util/uid-util.h"
 #include "runtime/coordinator.h"
 #include "runtime/runtime-state.h"
 #include "runtime/timestamp-value.h"
@@ -126,7 +126,6 @@ class ClientRequestState;
 /// * uuid_lock_
 /// * catalog_version_lock_
 /// * connection_to_sessions_map_lock_
-/// * client_request_state_map_lock_
 ///
 /// TODO: The state of a running query is currently not cleaned up if the
 /// query doesn't experience any errors at runtime and close() doesn't get called.
@@ -138,8 +137,10 @@ class ClientRequestState;
 class ImpalaServer : public ImpalaServiceIf,
                      public ImpalaHiveServer2ServiceIf,
                      public ThriftServer::ConnectionHandlerIf,
-                     public boost::enable_shared_from_this<ImpalaServer> {
+                     public boost::enable_shared_from_this<ImpalaServer>,
+                     public CacheLineAligned {
  public:
+
   ImpalaServer(ExecEnv* exec_env);
   ~ImpalaServer();
 
@@ -502,8 +503,8 @@ class ImpalaServer : public ImpalaServiceIf,
       std::shared_ptr<SessionState> session_state, bool* registered_exec_state,
       std::shared_ptr<ClientRequestState>* exec_state) WARN_UNUSED_RESULT;
 
-  /// Registers the query exec state with client_request_state_map_ using the globally
-  /// unique query_id and add the query id to session state's open query list.
+  /// Registers the query exec state with client_request_state_map_ using the
+  /// globally unique query_id and add the query id to session state's open query list.
   /// The caller must have checked out the session state.
   Status RegisterQuery(std::shared_ptr<SessionState> session_state,
       const std::shared_ptr<ClientRequestState>& exec_state) WARN_UNUSED_RESULT;
@@ -521,9 +522,9 @@ class ImpalaServer : public ImpalaServiceIf,
       const std::shared_ptr<ClientRequestState>& exec_state) WARN_UNUSED_RESULT;
 
   /// Unregister the query by cancelling it, removing exec_state from
-  /// client_request_state_map_, and removing the query id from session state's in-flight
-  /// query list.  If check_inflight is true, then return an error if the query is not
-  /// yet in-flight.  Otherwise, proceed even if the query isn't yet in-flight (for
+  /// client_request_state_map_, and removing the query id from session state's
+  /// in-flight query list.  If check_inflight is true, then return an error if the query
+  /// is not yet in-flight.  Otherwise, proceed even if the query isn't yet in-flight (for
   /// cleaning up after an error on the query issuing path).
   Status UnregisterQuery(const TUniqueId& query_id, bool check_inflight,
       const Status* cause = NULL) WARN_UNUSED_RESULT;
@@ -623,7 +624,7 @@ class ImpalaServer : public ImpalaServiceIf,
 
   /// Copies a query's state into the query log. Called immediately prior to a
   /// ClientRequestState's deletion. Also writes the query profile to the profile log
-  /// on disk. Must be called with client_request_state_map_lock_ held
+  /// on disk.
   void ArchiveQuery(const ClientRequestState& query);
 
   /// Checks whether the given user is allowed to delegate as the specified do_as_user.
@@ -868,16 +869,12 @@ class ImpalaServer : public ImpalaServiceIf,
   /// when there are sessions that have a timeout.
   ConditionVariable session_timeout_cv_;
 
-  /// map from query id to exec state; ClientRequestState is owned by us and referenced
+  /// maps from query id to exec state; ClientRequestState is owned by us and referenced
   /// as a shared_ptr to allow asynchronous deletion
-  typedef boost::unordered_map<TUniqueId, std::shared_ptr<ClientRequestState>>
+  typedef class ShardedQueryMap<std::shared_ptr<ClientRequestState>>
       ClientRequestStateMap;
   ClientRequestStateMap client_request_state_map_;
 
-  /// Protects client_request_state_map_. See "Locking" in the class comment for lock
-  /// acquisition order.
-  boost::mutex client_request_state_map_lock_;
-
   /// Default query options in the form of TQueryOptions and beeswax::ConfigVariable
   TQueryOptions default_query_options_;
   std::vector<beeswax::ConfigVariable> default_configs_;

http://git-wip-us.apache.org/repos/asf/impala/blob/19ab465b/be/src/util/sharded-query-map-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/sharded-query-map-util.h b/be/src/util/sharded-query-map-util.h
new file mode 100644
index 0000000..59dc2d5
--- /dev/null
+++ b/be/src/util/sharded-query-map-util.h
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SHARDED_QUERY_MAP_UTIL_H
+#define SHARDED_QUERY_MAP_UTIL_H
+
+#include <boost/thread/lock_guard.hpp>
+#include <unordered_map>
+
+#include "gen-cpp/Types_types.h"
+#include "util/aligned-new.h"
+#include "util/spinlock.h"
+#include "util/uid-util.h"
+
+namespace impala {
+
+/// This is a template that can be used for any map that maps from a query ID (TUniqueId)
+/// to some object, and that needs to be sharded. It provides a SpinLock per shard to
+/// synchronize access to each shard of the map. The underlying shard is locked and
+/// accessed by instantiating a ScopedShardedMapRef.
+//
+/// Usage pattern:
+//
+///   typedef ShardedQueryMap<QueryState*> QueryStateMap;
+///   QueryStateMap qs_map_;
+//
+template<typename T>
+class ShardedQueryMap {
+ public:
+
+  // This function takes a lambda which should take a parameter of object 'T' and
+  // runs the lambda for all the entries in the map. The lambda should have a return
+  // type of 'void'..
+  // TODO: If necessary, refactor the lambda signature to allow returning Status objects.
+  void DoFuncForAllEntries(const std::function<void(const T&)>& call) {
+    for (int i = 0; i < NUM_QUERY_BUCKETS; ++i) {
+      boost::lock_guard<SpinLock> l(shards_[i].map_lock_);
+      for (const auto& map_value_ref: shards_[i].map_) {
+        call(map_value_ref.second);
+      }
+    }
+  }
+
+ private:
+  template <typename T2>
+  friend class ScopedShardedMapRef;
+
+  // Number of buckets to split the containers of query IDs into.
+  static constexpr uint32_t NUM_QUERY_BUCKETS = 4;
+
+  // We group the map and its corresponding lock together to avoid false sharing. Since
+  // we will always access a map and its corresponding lock together, it's better if
+  // they can be allocated on the same cache line.
+  struct MapShard : public CacheLineAligned {
+    std::unordered_map<TUniqueId, T> map_;
+    SpinLock map_lock_;
+  };
+  struct MapShard shards_[NUM_QUERY_BUCKETS];
+};
+
+/// Use this class to obtain a locked reference to the underlying map shard
+/// of a ShardedQueryMap, corresponding to the 'query_id'.
+//
+/// Pattern:
+/// {
+///   ScopedShardedMapRef map_ref(qid, sharded_map);
+///   DCHECK(map_ref != nullptr);  <nullptr should never be returned>
+///   ...
+/// }
+//
+/// The caller should ensure that the lifetime of the ShardedQueryMap should be longer
+/// than the lifetime of this scoped class.
+template <typename T>
+class ScopedShardedMapRef {
+ public:
+
+  // Finds the appropriate map that could/should contain 'query_id' and locks it.
+  ScopedShardedMapRef(
+      const TUniqueId& query_id, class ShardedQueryMap<T>* sharded_map) {
+    DCHECK(sharded_map != nullptr);
+    int qs_map_bucket = QueryIdToBucket(query_id);
+    shard_ = &sharded_map->shards_[qs_map_bucket];
+
+    // Lock the corresponding shard.
+    shard_->map_lock_.lock();
+  }
+
+  ~ScopedShardedMapRef() {
+    shard_->map_lock_.DCheckLocked();
+    shard_->map_lock_.unlock();
+  }
+
+  // Returns the shard (map) for the 'query_id' passed to the constructor.
+  // Should never return nullptr.
+  std::unordered_map<TUniqueId, T>* get() {
+    shard_->map_lock_.DCheckLocked();
+    return &shard_->map_;
+  }
+
+  std::unordered_map<TUniqueId, T>* operator->() {
+    shard_->map_lock_.DCheckLocked();
+    return get();
+  }
+
+ private:
+
+  // Return the correct bucket that a query ID would belong to.
+  inline int QueryIdToBucket(const TUniqueId& query_id) {
+    int bucket =
+        static_cast<int>(query_id.hi) % ShardedQueryMap<T>::NUM_QUERY_BUCKETS;
+    DCHECK(bucket < ShardedQueryMap<T>::NUM_QUERY_BUCKETS && bucket >= 0);
+    return bucket;
+  }
+
+  typename ShardedQueryMap<T>::MapShard* shard_;
+  DISALLOW_COPY_AND_ASSIGN(ScopedShardedMapRef);
+};
+
+} // namespace impala
+
+#endif /* SHARDED_QUERY_MAP_UTIL_H */