You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/04/17 19:47:19 UTC

[kudu] 02/03: tserver: add locking around an RPC's usage of a Scanner

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 163cd256ae7ef8ef0d05c8595de5ebf7ae558cf7
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Tue Apr 7 14:00:56 2020 -0700

    tserver: add locking around an RPC's usage of a Scanner
    
    Prior to this patch, there was little to prevent multiple RPCs from
    trying to access and work with a single Scanner object other than the
    call sequence ID checks. In practice, those checks were fairly likely to
    prevent races -- there was only a very small window of time in between
    looking up a scanner and checking its sequence ID -- but it was buggy
    nonetheless.
    
    In addition, the lack of reasonable locking meant that some variables
    like scanner::num_rows_returned_ were needlessly atomic. In certain
    workloads this actually took a measurable amount of CPU.
    
    This patch adds a new test that would reliably crash the tserver and
    trigger TSAN warnings prior to the fix. It adds some explicit locking
    around the Scanner object and asserts that most methods on the scanner
    require the lock to be held.
    
    I benchmarked this by running perf-stat on a tserver and 'kudu perf
    table_scan' to count the rows in a table 200 times (ie scan with no
    columns projected). This is a best case for seeing the value of the
    optimization.
    
    Before:
     Performance counter stats for './build/latest/bin/kudu tserver run -fs-wal-dir /tmp/ts':
    
             11,753.59 msec task-clock                #    1.166 CPUs utilized
                 7,174      context-switches          #    0.610 K/sec
                   807      cpu-migrations            #    0.069 K/sec
                14,909      page-faults               #    0.001 M/sec
        32,447,480,644      cycles                    #    2.761 GHz
        49,275,463,523      instructions              #    1.52  insn per cycle
         7,570,941,653      branches                  #  644.139 M/sec
            14,271,190      branch-misses             #    0.19% of all branches
    
          10.082020598 seconds time elapsed
    
          11.561886000 seconds user
           0.340585000 seconds sys
    
    After:
     Performance counter stats for './build/latest/bin/kudu.opt tserver run -fs-wal-dir /tmp/ts':
    
              9,426.45 msec task-clock                #    1.010 CPUs utilized
                 6,906      context-switches          #    0.733 K/sec
                   892      cpu-migrations            #    0.095 K/sec
                15,078      page-faults               #    0.002 M/sec
        26,127,343,920      cycles                    #    2.772 GHz
        48,101,748,066      instructions              #    1.84  insn per cycle
         7,402,811,470      branches                  #  785.323 M/sec
            13,857,599      branch-misses             #    0.19% of all branches
    
           9.335786317 seconds time elapsed
    
           9.258446000 seconds user
           0.315547000 seconds sys
    
    (1.24x fewer cycles for the same work)
    
    When I actually scan one column of data, the improvement is only about
    1% since other CPU consumption is dominant.
    
    Change-Id: I3591e85a07aefadaf7fb05768109c8a261a8828e
    Reviewed-on: http://gerrit.cloudera.org:8080/15677
    Tested-by: Todd Lipcon <to...@apache.org>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/tserver/scanners-test.cc      |   5 +-
 src/kudu/tserver/scanners.cc           |  77 +++++++------
 src/kudu/tserver/scanners.h            | 194 ++++++++++++++++++++++-----------
 src/kudu/tserver/tablet_server-test.cc |  70 ++++++++++++
 src/kudu/tserver/tablet_service.cc     |  61 ++++++-----
 5 files changed, 276 insertions(+), 131 deletions(-)

diff --git a/src/kudu/tserver/scanners-test.cc b/src/kudu/tserver/scanners-test.cc
index a3d4c11..6e30036 100644
--- a/src/kudu/tserver/scanners-test.cc
+++ b/src/kudu/tserver/scanners-test.cc
@@ -86,7 +86,10 @@ TEST(ScannerTest, TestExpire) {
   mgr.NewScanner(null_replica, RemoteUser(), RowFormatFlags::NO_FLAGS, &s1);
   mgr.NewScanner(null_replica, RemoteUser(), RowFormatFlags::NO_FLAGS, &s2);
   SleepFor(MonoDelta::FromMilliseconds(200));
-  s2->UpdateAccessTime();
+  {
+    // Update the access time by locking and unlocking.
+    auto access = s2->LockForAccess();
+  }
   mgr.RemoveExpiredScanners();
   ASSERT_EQ(1, mgr.CountActiveScanners());
   ASSERT_EQ(1, mgr.metrics_->scanners_expired->value());
diff --git a/src/kudu/tserver/scanners.cc b/src/kudu/tserver/scanners.cc
index 71cf9b4..41ea594 100644
--- a/src/kudu/tserver/scanners.cc
+++ b/src/kudu/tserver/scanners.cc
@@ -21,6 +21,7 @@
 #include <functional>
 #include <memory>
 #include <mutex>
+#include <numeric>
 #include <ostream>
 
 #include <gflags/gflags.h>
@@ -30,6 +31,7 @@
 #include "kudu/common/iterator.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
+#include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/hash/string_hash.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
@@ -188,13 +190,13 @@ bool ScannerManager::UnregisterScanner(const string& scanner_id) {
       return false;
     }
 
-    bool is_initialized = it->second->IsInitialized();
-    if (is_initialized) {
-      descriptor = it->second->descriptor();
+    bool is_initted = it->second->is_initted();
+    if (is_initted) {
+      descriptor = it->second->Descriptor();
       descriptor.state = it->second->iter()->HasNext() ? ScanState::kFailed : ScanState::kComplete;
     }
     stripe.scanners_by_id_.erase(it);
-    if (!is_initialized) {
+    if (!is_initted) {
       return true;
     }
   }
@@ -227,8 +229,8 @@ vector<ScanDescriptor> ScannerManager::ListScans() const {
   for (const ScannerMapStripe* stripe : scanner_maps_) {
     shared_lock<RWMutex> l(stripe->lock_);
     for (const auto& se : stripe->scanners_by_id_) {
-      if (se.second->IsInitialized()) {
-        ScanDescriptor desc = se.second->descriptor();
+      if (se.second->is_initted()) {
+        ScanDescriptor desc = se.second->Descriptor();
         desc.state = ScanState::kActive;
         EmplaceOrDie(&scans, se.first, std::move(desc));
       }
@@ -280,8 +282,8 @@ void ScannerManager::RemoveExpiredScanners() {
           scanner->tablet_id(),
           idle_time.ToMilliseconds(),
           scanner_ttl.ToMilliseconds());
-      if (scanner->IsInitialized()) {
-        descriptors.emplace_back(scanner->descriptor());
+      if (scanner->is_initted()) {
+        descriptors.emplace_back(scanner->Descriptor());
       }
       it = stripe->scanners_by_id_.erase(it);
       if (metrics_) {
@@ -320,11 +322,12 @@ Scanner::Scanner(string id, const scoped_refptr<TabletReplica>& tablet_replica,
     : id_(std::move(id)),
       tablet_replica_(tablet_replica),
       remote_user_(std::move(remote_user)),
-      call_seq_id_(0),
       start_time_(MonoTime::Now()),
+      row_format_flags_(row_format_flags),
       metrics_(metrics),
       arena_(256),
-      row_format_flags_(row_format_flags),
+      last_access_time_(start_time_),
+      call_seq_id_(0),
       num_rows_returned_(0) {
   if (tablet_replica_) {
     auto tablet = tablet_replica->shared_tablet();
@@ -332,7 +335,6 @@ Scanner::Scanner(string id, const scoped_refptr<TabletReplica>& tablet_replica,
       tablet->metrics()->tablet_active_scanners->Increment();
     }
   }
-  UpdateAccessTime();
 }
 
 Scanner::~Scanner() {
@@ -347,38 +349,46 @@ Scanner::~Scanner() {
   }
 }
 
-void Scanner::UpdateAccessTime() {
-  std::lock_guard<simple_spinlock> l(lock_);
-  last_access_time_ = MonoTime::Now();
-}
-
 void Scanner::AddTimings(const CpuTimes& elapsed) {
-  std::lock_guard<simple_spinlock> l(lock_);
+  std::unique_lock<RWMutex> l(cpu_times_lock_);
   cpu_times_.Add(elapsed);
 }
 
 void Scanner::Init(unique_ptr<RowwiseIterator> iter,
-                   unique_ptr<ScanSpec> spec) {
-  std::lock_guard<simple_spinlock> l(lock_);
+                   unique_ptr<ScanSpec> spec,
+                   unique_ptr<Schema> client_projection) {
+  lock_.AssertAcquired();
   CHECK(!iter_) << "Already initialized";
   iter_ = std::move(iter);
   spec_ = std::move(spec);
+  client_projection_schema_ = std::move(client_projection);
+  initted_.store(true, std::memory_order_release);
 }
 
 const ScanSpec& Scanner::spec() const {
   return *spec_;
 }
 
-void Scanner::GetIteratorStats(vector<IteratorStats>* stats) const {
-  iter_->GetIteratorStats(stats);
+IteratorStats Scanner::UpdateStatsAndGetDelta() {
+  // Here we have to dig into the per-column iterator stats, sum them up, and then
+  // subtract out the total that we already reported in a previous scan.
+  lock_.AssertAcquired();
+  vector<IteratorStats> stats_by_col;
+  iter_->GetIteratorStats(&stats_by_col);
+  IteratorStats total_stats = std::accumulate(stats_by_col.begin(),
+                                              stats_by_col.end(),
+                                              IteratorStats());
+  IteratorStats delta_stats = total_stats - already_reported_stats_;
+  already_reported_stats_ = total_stats;
+  return delta_stats;
 }
 
-ScanDescriptor Scanner::descriptor() const {
+ScanDescriptor Scanner::Descriptor() const {
   // Ignore non-initialized scans. The initializing state is transient, and
   // handling it correctly is complicated. Since the scanner is initialized we
-  // can assume iter(), spec(), and client_projection_schema() return valid
+  // can assume iter_, spec_, and client_projection_schema_ are valid
   // pointers.
-  CHECK(IsInitialized());
+  CHECK(is_initted());
 
   ScanDescriptor descriptor;
   descriptor.tablet_id = tablet_id();
@@ -386,7 +396,7 @@ ScanDescriptor Scanner::descriptor() const {
   descriptor.remote_user = remote_user();
   descriptor.start_time = start_time_;
 
-  for (const auto& column : client_projection_schema()->columns()) {
+  for (const auto& column : client_projection_schema_->columns()) {
     descriptor.projected_columns.emplace_back(column.name());
   }
 
@@ -408,26 +418,23 @@ ScanDescriptor Scanner::descriptor() const {
   }
 
   vector<IteratorStats> iterator_stats;
-  GetIteratorStats(&iterator_stats);
+  iter_->GetIteratorStats(&iterator_stats);
 
-  DCHECK_EQ(iterator_stats.size(), iter()->schema().num_columns());
+  DCHECK_EQ(iterator_stats.size(), iter_->schema().num_columns());
   for (int col_idx = 0; col_idx < iterator_stats.size(); col_idx++) {
-    descriptor.iterator_stats.emplace_back(iter()->schema().column(col_idx).name(),
+    descriptor.iterator_stats.emplace_back(iter_->schema().column(col_idx).name(),
                                            iterator_stats[col_idx]);
   }
 
-  {
-    std::lock_guard<simple_spinlock> l(lock_);
-    descriptor.last_call_seq_id = call_seq_id_;
-    descriptor.last_access_time = last_access_time_;
-    descriptor.cpu_times = cpu_times_;
-  }
+  descriptor.last_call_seq_id = ANNOTATE_UNPROTECTED_READ(call_seq_id_);
+  descriptor.last_access_time = last_access_time_.load(std::memory_order_relaxed);
+  descriptor.cpu_times = cpu_times();
 
   return descriptor;
 }
 
 CpuTimes Scanner::cpu_times() const {
-  std::lock_guard<simple_spinlock> l(lock_);
+  shared_lock<RWMutex> l(cpu_times_lock_);
   return cpu_times_;
 }
 
diff --git a/src/kudu/tserver/scanners.h b/src/kudu/tserver/scanners.h
index 5f81763..4c64fb3 100644
--- a/src/kudu/tserver/scanners.h
+++ b/src/kudu/tserver/scanners.h
@@ -31,15 +31,14 @@
 
 #include "kudu/common/iterator_stats.h"
 #include "kudu/common/scan_spec.h"
-#include "kudu/common/schema.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/remote_user.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/util/auto_release_pool.h"
 #include "kudu/util/condition_variable.h"
-#include "kudu/util/locks.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -51,6 +50,7 @@
 namespace kudu {
 
 class RowwiseIterator;
+class Schema;
 class Status;
 class Thread;
 
@@ -192,32 +192,88 @@ class ScopedUnregisterScanner {
 };
 
 // An open scanner on the server side.
+//
+// NOTE: unless otherwise specified, all methods of this class require that the
+// caller has acquired the access lock using Scanner::LockForAccess(). It's assumed
+// that any RPC related to a scanner will acquire that lock, so that only a single
+// RPC thread works on a given scanner at a time.
 class Scanner {
  public:
+  class AccessLock {
+   public:
+    AccessLock(AccessLock&& l) noexcept
+        : s_(l.s_),
+          lock_(std::move(l.lock_)) {
+    }
+    ~AccessLock() {
+      if (lock_.owns_lock()) {
+        Unlock();
+      }
+    }
+    void Unlock() {
+      s_->last_access_time_.store(MonoTime::Now(), std::memory_order_relaxed);
+      lock_.unlock();
+    }
+    bool owns_lock() {
+      return lock_.owns_lock();
+    }
+
+   private:
+    friend class Scanner;
+    explicit AccessLock(Scanner* s)
+        : s_(DCHECK_NOTNULL(s)),
+          lock_(s->lock_) {
+    }
+    AccessLock(Scanner* s, std::try_to_lock_t try_lock)
+        : s_(DCHECK_NOTNULL(s)),
+          lock_(s->lock_, try_lock) {
+    }
+
+    Scanner* const s_;
+    std::unique_lock<Mutex> lock_;
+  };
+
   Scanner(std::string id,
           const scoped_refptr<tablet::TabletReplica>& tablet_replica,
           rpc::RemoteUser remote_user, ScannerMetrics* metrics,
           uint64_t row_format_flags);
   ~Scanner();
 
-  // Attach an actual iterator and a ScanSpec to this Scanner.
-  // Takes ownership of 'iter' and 'spec'.
+  // Lock this scanner for the purposes of an RPC.
+  //
+  // While the lock is held, the TimeSinceLastAccess() method will return 0, indicating
+  // that a call is actively being processed. Upon destruction of the returned Lock
+  // object, the last-access time will be set to the current time and the internal lock
+  // released.
+  AccessLock LockForAccess() WARN_UNUSED_RESULT {
+    return AccessLock(this);
+  }
+
+  // Try to lock the scanner, but do not wait in the case that the scanner
+  // is already locked by another thread.
+  //
+  // Check result.owns_lock() to see if the lock was successful.
+  AccessLock TryLockForAccess() WARN_UNUSED_RESULT {
+    return AccessLock(this, std::try_to_lock);
+  }
+
+  // Mark the scanner as initialized. This indicates that it successfully
+  // created an iterator, passed validation, etc, and will allow it to
+  // show up in the scanner dashboard.
   void Init(std::unique_ptr<RowwiseIterator> iter,
-            std::unique_ptr<ScanSpec> spec);
+            std::unique_ptr<ScanSpec> spec,
+            std::unique_ptr<Schema> client_projection);
 
   RowwiseIterator* iter() {
+    lock_.AssertAcquired();
     return DCHECK_NOTNULL(iter_.get());
   }
 
   const RowwiseIterator* iter() const {
+    lock_.AssertAcquired();
     return DCHECK_NOTNULL(iter_.get());
   }
 
-  // Update the last-access time to the current time,
-  // delaying the expiration of the Scanner for another TTL
-  // period.
-  void UpdateAccessTime();
-
   // Add the timings in 'elapsed' to the total timings for this scanner.
   void AddTimings(const CpuTimes& elapsed);
 
@@ -229,6 +285,7 @@ class Scanner {
   }
 
   Arena* arena() {
+    lock_.AssertAcquired();
     return &arena_;
   }
 
@@ -248,73 +305,71 @@ class Scanner {
 
   // Returns the current call sequence ID of the scanner.
   uint32_t call_seq_id() const {
-    std::lock_guard<simple_spinlock> l(lock_);
+    lock_.AssertAcquired();
     return call_seq_id_;
   }
 
   // Increments the call sequence ID.
   void IncrementCallSeqId() {
-    std::lock_guard<simple_spinlock> l(lock_);
-    call_seq_id_ += 1;
+    lock_.AssertAcquired();
+    call_seq_id_++;
   }
 
   // Return the delta from the last time this scan was updated to 'now'.
   MonoDelta TimeSinceLastAccess(const MonoTime& now) const {
-    std::lock_guard<simple_spinlock> l(lock_);
-    return now - last_access_time_;
+    std::unique_lock<Mutex> l(lock_, std::try_to_lock);
+    if (l.owns_lock()) {
+      return now - last_access_time_;
+    }
+    return MonoDelta::FromMilliseconds(0);
   }
 
   // Returns the time this scan was started.
   const MonoTime& start_time() const { return start_time_; }
 
-  // Associate a projection schema with the Scanner. The scanner takes
-  // ownership of 'client_projection_schema'.
+
+  // Returns client's projection schema.
   //
-  // Note: 'client_projection_schema' is set if the client's
-  // projection is a subset of the iterator's schema -- the iterator's
-  // schema needs to include all columns that have predicates, whereas
-  // the client may not want to project all of them.
-  void set_client_projection_schema(std::unique_ptr<Schema> client_projection_schema) {
-    client_projection_schema_ = std::move(client_projection_schema);
+  // This may differ from the schema used by the iterator, which must contain all columns
+  // used as predicates).
+  const Schema* client_projection_schema() const {
+    lock_.AssertAcquired();
+    return DCHECK_NOTNULL(client_projection_schema_.get());
   }
 
-  // Returns request's projection schema if it differs from the schema
-  // used by the iterator (which must contain all columns used as
-  // predicates). Returns NULL if the iterator's schema is the same as
-  // the projection schema.
-  // See the note about 'set_client_projection_schema' above.
-  const Schema* client_projection_schema() const { return client_projection_schema_.get(); }
-
-  // Get per-column stats for each iterator.
-  void GetIteratorStats(std::vector<IteratorStats>* stats) const;
-
-  const IteratorStats& already_reported_stats() const {
-    return already_reported_stats_;
-  }
-  void set_already_reported_stats(const IteratorStats& stats) {
-    already_reported_stats_ = stats;
-  }
+  // Update the stats from the underlying scanner and return a delta since the
+  // previous call to this method.
+  IteratorStats UpdateStatsAndGetDelta();
 
   uint64_t row_format_flags() const {
+    lock_.AssertAcquired();
     return row_format_flags_;
   }
 
   void add_num_rows_returned(int64_t num_rows_added) {
+    lock_.AssertAcquired();
     num_rows_returned_ += num_rows_added;
     DCHECK_LE(num_rows_added, num_rows_returned_);
   }
 
   int64_t num_rows_returned() const {
+    lock_.AssertAcquired();
     return num_rows_returned_;
   }
 
   bool has_fulfilled_limit() const {
-    std::lock_guard<simple_spinlock> l(lock_);
+    lock_.AssertAcquired();
     return spec_ && spec_->has_limit() && num_rows_returned_ >= spec_->limit();
   }
 
-  ScanDescriptor descriptor() const;
+  // Return a descriptor of the current state of this scan.
+  // Does not require the AccessLock.
+  //
+  // REQUIRES: is_initted() must be true.
+  ScanDescriptor Descriptor() const;
 
+  // Returns the amount of CPU time accounted to this scanner.
+  // Does not require the AccessLock.
   CpuTimes cpu_times() const;
 
  private:
@@ -325,9 +380,8 @@ class Scanner {
   // Return true if the scanner has been initialized (i.e has an iterator).
   // Once a Scanner is initialized, it is safe to assume that iter() and spec()
   // return non-NULL for the lifetime of the Scanner object.
-  bool IsInitialized() const {
-    std::lock_guard<simple_spinlock> l(lock_);
-    return iter_ != nullptr;
+  bool is_initted() const {
+    return initted_.load(std::memory_order_acquire);
   }
 
   // The unique ID of this scanner.
@@ -340,51 +394,60 @@ class Scanner {
   // first request.
   const rpc::RemoteUser remote_user_;
 
-  // The last time that the scanner was accessed.
-  MonoTime last_access_time_;
-
-  // The current call sequence ID.
-  uint32_t call_seq_id_;
-
-  // Protects last_access_time_ call_seq_id_, iter_, spec_
-  mutable simple_spinlock lock_;
-
   // The time the scanner was started.
   const MonoTime start_time_;
 
+  // The row format flags the client passed, if any.
+  const uint64_t row_format_flags_;
+
   // (Optional) scanner metrics struct, for recording scanner's duration.
   ScannerMetrics* metrics_;
 
-  // A summary of the statistics already reported to the metrics system
-  // for this scanner. This allows us to report the metrics incrementally
-  // as the scanner proceeds.
-  IteratorStats already_reported_stats_;
+  AutoReleasePool autorelease_pool_;
+
+  // Arena used for allocations which must last as long as the scanner
+  // itself. This is _not_ used for row data, which is scoped to a single RPC
+  // response.
+  Arena arena_;
+
+  // Protects access to this scanner by a single RPC at a time.
+  mutable Mutex lock_;
+
+  std::atomic<bool> initted_ { false };
 
   // The spec used by 'iter_'
+  // Assumed to be set once initted_ is true.
   std::unique_ptr<ScanSpec> spec_;
 
+  // Assumed to be set once initted_ is true.
   std::unique_ptr<RowwiseIterator> iter_;
 
   // Stores the request's projection schema, if it differs from the
   // schema used by the iterator.
+  // Assumed to be set once initted_ is true.
   std::unique_ptr<Schema> client_projection_schema_;
 
-  AutoReleasePool autorelease_pool_;
+  // The last time that the scanner was accessed.
+  // Only modified under lock_ but can be read outside.
+  std::atomic<MonoTime> last_access_time_;
 
-  // Arena used for allocations which must last as long as the scanner
-  // itself. This is _not_ used for row data, which is scoped to a single RPC
-  // response.
-  Arena arena_;
+  // The current call sequence ID.
+  // Only modified under lock_ but can be read outside.
+  uint32_t call_seq_id_;
 
-  // The row format flags the client passed, if any.
-  const uint64_t row_format_flags_;
+  // A summary of the statistics already reported to the metrics system
+  // for this scanner. This allows us to report the metrics incrementally
+  // as the scanner proceeds.
+  // Protected by lock_.
+  IteratorStats already_reported_stats_;
 
   // The number of rows that have been serialized and sent over the wire by
   // this scanner.
-  std::atomic<int64_t> num_rows_returned_;
+  int64_t num_rows_returned_;
 
   // The cumulative amounts of wall, user cpu, and system cpu time spent on
   // this scanner, in seconds.
+  mutable RWMutex cpu_times_lock_;
   CpuTimes cpu_times_;
 
   DISALLOW_COPY_AND_ASSIGN(Scanner);
@@ -457,7 +520,6 @@ class ScopedAddScannerTiming {
     stopped_ = true;
     sw_.stop();
     scanner_->AddTimings(sw_.elapsed());
-    scanner_->UpdateAccessTime();
     *cpu_times_ = scanner_->cpu_times();
   }
 
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index b4d3dc6..863534a 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -166,6 +166,7 @@ DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(enable_rowset_compaction);
 DECLARE_bool(fail_dns_resolution);
 DECLARE_bool(rowset_metadata_store_keys);
+DECLARE_bool(scanner_unregister_on_invalid_seq_id);
 DECLARE_double(cfile_inject_corruption);
 DECLARE_double(env_inject_eio);
 DECLARE_double(env_inject_full);
@@ -2504,6 +2505,75 @@ TEST_F(TabletServerTest, TestScanYourWrites_PropagatedTimestampInTheFuture) {
   ASSERT_EQ(R"((int32 key=0, int32 int_val=0, string string_val="original0"))", results[0]);
 }
 
+// Test that, if multiple RPCs arrive concurrently for a single scanner, no state is
+// corrupted, etc. We expected errors but no crashes or TSAN issues.
+TEST_F(TabletServerTest, TestConcurrentAccessToOneScanner) {
+  constexpr int kNumRows = 1000;
+  constexpr int kNumThreads = 8;
+  // Perform a write.
+  InsertTestRowsRemote(0, kNumRows, 1, nullptr, kTabletId);
+
+  FLAGS_scanner_batch_size_rows = 10;
+  FLAGS_scanner_unregister_on_invalid_seq_id = false;
+  ScanResponsePB open_resp;
+  NO_FATALS(OpenScannerWithAllColumns(&open_resp));
+
+  std::atomic<bool> done { false };
+  vector<thread> threads;
+  // Add a thread which concurrently lists scans. The ListScans() function accesses
+  // scanners to expose diagnostic info such as stats, etc, so calling it in a loop can
+  // help uncover potential races in this code path.
+  threads.emplace_back(
+      [&]() {
+        while (!done) {
+          mini_server_->server()->scanner_manager()->ListScans();
+        }
+      });
+  std::atomic<int> next_seq_id { 1 };
+  std::atomic<int> total_rows { 0 };
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back(
+        [&]() {
+          while (!done) {
+            RpcController rpc;
+            rpc.set_timeout(MonoDelta::FromSeconds(10));
+            ScanRequestPB req;
+            ScanResponsePB resp;
+            req.set_scanner_id(open_resp.scanner_id());
+            // Try either the expected next sequence ID or the following one.
+            // We sometimes try the following one since otherwise the race of two
+            // RPCs processing concurrently is very very narrow -- we increment the
+            // call ID immediately after looking up the scanner.
+            req.set_call_seq_id(next_seq_id + (rand() % 2));
+            req.set_batch_size_bytes(100);
+            Status s = proxy_->Scan(req, &resp, &rpc);
+            CHECK_OK(s);
+            if (resp.has_error()) {
+              if (resp.error().code() == TabletServerErrorPB::SCANNER_EXPIRED) {
+                break;
+              }
+              CHECK(resp.error().code() == TabletServerErrorPB::INVALID_SCAN_CALL_SEQ_ID)
+                  << "unexpected error: " << resp.error().DebugString();
+              VLOG(1) << "Got an invalid seq id " << req.call_seq_id();
+            } else {
+              VLOG(1) << "Continued scan: " << resp.data().num_rows() << " rows";
+              total_rows += resp.data().num_rows();
+              next_seq_id++;
+              if (!resp.has_more_results()) {
+                VLOG(1) << "Finished scan";
+                done = true;
+              }
+            }
+          }
+        });
+  }
+  for (auto& t : threads) {
+    t.join();
+  }
+  ASSERT_EQ(total_rows, kNumRows);
+}
+
+
 TEST_F(TabletServerTest, TestScanWithStringPredicates) {
   InsertTestRowsDirect(0, 100);
 
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 927182c..34eda3d 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -23,7 +23,6 @@
 #include <cstring>
 #include <functional>
 #include <memory>
-#include <numeric>
 #include <ostream>
 #include <string>
 #include <type_traits>
@@ -150,6 +149,12 @@ DEFINE_bool(scanner_inject_service_unavailable_on_continue_scan, false,
            "any Scan continuation RPC call. Used for tests.");
 TAG_FLAG(scanner_inject_service_unavailable_on_continue_scan, unsafe);
 
+DEFINE_bool(scanner_unregister_on_invalid_seq_id, true,
+            "If set, an invalid sequence ID will cause a scanner to get unregistered. "
+            "Used for tests.");
+TAG_FLAG(scanner_unregister_on_invalid_seq_id, unsafe);
+
+
 DEFINE_bool(tserver_enforce_access_control, false,
             "If set, the server will apply fine-grained access control rules "
             "to client RPCs.");
@@ -1814,7 +1819,14 @@ void TabletServiceImpl::ScannerKeepAlive(const ScannerKeepAliveRequestPB *req,
     SetupErrorAndRespond(resp->mutable_error(), s, error_code, context);
     return;
   }
-  scanner->UpdateAccessTime();
+  {
+    // Locking for access has the side effect of updating the access time.
+    // Here we do a trylock -- a failure indicates that there is already another
+    // thread currently accessing the scanner, so that thread will update the
+    // access time upon release of the lock.
+    auto lock = scanner->TryLockForAccess();
+  }
+
   context->RespondSuccess();
 }
 
@@ -2355,18 +2367,14 @@ static Status SetupScanSpec(const NewScanRequestPB& scan_pb,
     const void* lower_bound = nullptr;
     const void* upper_bound = nullptr;
     if (pred_pb.has_lower_bound()) {
-      const void* val;
       RETURN_NOT_OK(ExtractPredicateValue(*col, pred_pb.lower_bound(),
                                           scanner->arena(),
-                                          &val));
-      lower_bound = val;
+                                          &lower_bound));
     }
     if (pred_pb.has_inclusive_upper_bound()) {
-      const void* val;
       RETURN_NOT_OK(ExtractPredicateValue(*col, pred_pb.inclusive_upper_bound(),
                                           scanner->arena(),
-                                          &val));
-      upper_bound = val;
+                                          &upper_bound));
     }
 
     auto pred = ColumnPredicate::InclusiveRange(*col, lower_bound, upper_bound, scanner->arena());
@@ -2456,6 +2464,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
                                          scan_pb.row_format_flags(),
                                          &scanner);
   TRACE("Created scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
+  auto scanner_lock = scanner->LockForAccess();
 
   // If we early-exit out of this function, automatically unregister
   // the scanner.
@@ -2511,12 +2520,6 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   // remove unnecessary predicates.
   vector<ColumnSchema> missing_cols = spec.GetMissingColumns(projection);
 
-  // Store the original projection.
-  {
-    unique_ptr<Schema> orig_projection(new Schema(projection));
-    scanner->set_client_projection_schema(std::move(orig_projection));
-  }
-
   // Build a new projection with the projection columns and the missing columns,
   // annotating each column as a key column appropriately.
   //
@@ -2555,6 +2558,10 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
       CHECK_OK(projection_builder.AddColumn(col, /* is_key= */ false));
     }
   }
+
+  // Store the client's specified projection, prior to adding any missing
+  // columns for predicates, etc.
+  unique_ptr<Schema> client_projection(new Schema(std::move(projection)));
   projection = projection_builder.BuildWithoutIds();
   VLOG(3) << "Scan projection: " << projection.ToString(Schema::BASE_INFO);
 
@@ -2688,7 +2695,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
     return Status::OK();
   }
 
-  scanner->Init(std::move(iter), std::move(orig_spec));
+  scanner->Init(std::move(iter), std::move(orig_spec), std::move(client_projection));
 
   // Stop the scanner timer because ContinueScanRequest starts its own timer.
   scanner_timer.Stop();
@@ -2707,6 +2714,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
     // from the first half that is no longer executed in this codepath.
     ScanRequestPB continue_req(*req);
     continue_req.set_scanner_id(scanner->id());
+    scanner_lock.Unlock();
     RETURN_NOT_OK(HandleContinueScanRequest(&continue_req, rpc_context, result_collector,
                                             has_more_results, error_code));
   } else {
@@ -2729,9 +2737,6 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
 
   size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
 
-  // TODO(todd): need some kind of concurrency control on these scanner objects
-  // in case multiple RPCs hit the same scanner at the same time. Probably
-  // just a trylock and fail the RPC if it contends.
   SharedScanner scanner;
   TabletServerErrorPB::Code code = TabletServerErrorPB::UNKNOWN_ERROR;
   Status s = server_->scanner_manager()->LookupScanner(req->scanner_id(),
@@ -2748,6 +2753,10 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
     *error_code = code;
     return s;
   }
+  // TODO(todd) consider TryLockForAccess and return ServiceUnavailable in the case that
+  // another thread is already using the scanner? This should be rare in real
+  // circumstances -- only relevant when a client performs some retries on timeout.
+  auto scanner_lock = scanner->LockForAccess();
 
   if (PREDICT_FALSE(FLAGS_scanner_inject_service_unavailable_on_continue_scan)) {
     return Status::ServiceUnavailable("Injecting service unavailable status on Scan due to "
@@ -2769,6 +2778,9 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
 
   if (req->call_seq_id() != scanner->call_seq_id()) {
     *error_code = TabletServerErrorPB::INVALID_SCAN_CALL_SEQ_ID;
+    if (!FLAGS_scanner_unregister_on_invalid_seq_id) {
+      unreg_scanner.Cancel();
+    }
     return Status::InvalidArgument("Invalid call sequence ID in scan request");
   }
   scanner->IncrementCallSeqId();
@@ -2854,17 +2866,8 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
     return s;
   }
 
-  // Calculate the number of rows/cells/bytes actually processed. Here we have to dig
-  // into the per-column iterator stats, sum them up, and then subtract out the
-  // total that we already reported in a previous scan.
-  vector<IteratorStats> stats_by_col;
-  scanner->GetIteratorStats(&stats_by_col);
-  IteratorStats total_stats = std::accumulate(stats_by_col.begin(),
-                                              stats_by_col.end(),
-                                              IteratorStats());
-
-  IteratorStats delta_stats = total_stats - scanner->already_reported_stats();
-  scanner->set_already_reported_stats(total_stats);
+  // Calculate the number of rows/cells/bytes actually processed.
+  IteratorStats delta_stats = scanner->UpdateStatsAndGetDelta();
   TRACE_COUNTER_INCREMENT(SCANNER_BYTES_READ_METRIC_NAME, delta_stats.bytes_read);
 
   // Update metrics based on this scan request.