You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2020/08/31 17:09:32 UTC

[kudu] branch master updated: KUDU-1587 part 2: reject write ops if apply queue is overloaded

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new ee3bb83  KUDU-1587 part 2: reject write ops if apply queue is overloaded
ee3bb83 is described below

commit ee3bb83575a051c2feade1f8c159b2902a7160d5
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Thu Aug 13 22:43:32 2020 -0700

    KUDU-1587 part 2: reject write ops if apply queue is overloaded
    
    This patch implements control admission for write requests in tablet
    servers based on the load status of their apply queue. With this change,
    the recently introduced OpApplyQueueTest.ApplyQueueBackpressure scenario
    successfully passes.
    
    If the queue times of the tasks in the apply queue become higher than
    the specified threshold, the apply queue enters overloaded state.  When
    the queue is overloaded, the tablet server rejects incoming write
    requests with some probability.  The longer the queue stays overloaded,
    the greater the probability of rejections.  The apply queue exits the
    overloaded state when queue times drop below the specified threshold.
    
    This new behavior is not yet enabled by default, keeping the legacy
    behavior of unbounded/uncontrolled queue times as is.  To enable it,
    set --tablet_apply_pool_overload_threshold_ms to something greater
    than 0 (e.g., 500).
    
    Change-Id: I6d7688d6fa832e606b8efc4549568fa52dfa1931
    Reviewed-on: http://gerrit.cloudera.org:8080/16343
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../same_tablet_concurrent_writes-itest.cc         | 211 +++++++++++++++++----
 src/kudu/kserver/kserver.cc                        |  28 +--
 src/kudu/kserver/kserver.h                         |  10 +-
 .../master_options.h => kserver/kserver_options.h} |  25 +--
 src/kudu/master/master_options.h                   |   8 +-
 src/kudu/tablet/tablet_replica.cc                  |  46 ++---
 src/kudu/tserver/tablet_server-test.cc             |  13 +-
 src/kudu/tserver/tablet_server.cc                  |  16 +-
 src/kudu/tserver/tablet_server_options.cc          |  20 +-
 src/kudu/tserver/tablet_server_options.h           |   9 +-
 src/kudu/tserver/tablet_server_runner.cc           |  10 +-
 src/kudu/tserver/tablet_service.cc                 |  41 +++-
 src/kudu/tserver/tablet_service.h                  |  11 ++
 13 files changed, 326 insertions(+), 122 deletions(-)

diff --git a/src/kudu/integration-tests/same_tablet_concurrent_writes-itest.cc b/src/kudu/integration-tests/same_tablet_concurrent_writes-itest.cc
index 786f2e5..f67e05a 100644
--- a/src/kudu/integration-tests/same_tablet_concurrent_writes-itest.cc
+++ b/src/kudu/integration-tests/same_tablet_concurrent_writes-itest.cc
@@ -40,6 +40,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/sysinfo.h"
 #include "kudu/mini-cluster/internal_mini_cluster.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
@@ -63,7 +64,10 @@ DECLARE_int32(log_segment_size_mb);
 DECLARE_int32(max_num_columns);
 DECLARE_int32(rpc_num_service_threads);
 DECLARE_int32(rpc_service_queue_length);
+DECLARE_int32(tablet_inject_latency_on_apply_write_op_ms);
+DECLARE_uint32(tablet_apply_pool_overload_threshold_ms);
 
+METRIC_DECLARE_counter(op_apply_queue_overload_rejections);
 METRIC_DECLARE_counter(rpcs_queue_overflow);
 METRIC_DECLARE_counter(rpcs_timed_out_in_queue);
 METRIC_DECLARE_gauge_uint64(spinlock_contention_time);
@@ -101,13 +105,10 @@ namespace itest {
 // are involved in the process of pushing Raft consensus updates corresponding
 // to the incoming write requests, and the test pinpoints the contention among
 // various threads involved.
-class SameTabletConcurrentWritesTest : public KuduTest {
+class SameTabletConcurrentWritesBaseTest: public KuduTest {
  public:
-  static constexpr int kNumColumns = 250;
-
-  SameTabletConcurrentWritesTest()
-      : num_inserter_threads_(FLAGS_num_inserter_threads),
-        runtime_(MonoDelta::FromSeconds(FLAGS_runtime_sec)),
+  explicit SameTabletConcurrentWritesBaseTest(int num_columns)
+      : num_columns_(num_columns),
         do_run_(true) {
   }
 
@@ -118,13 +119,48 @@ class SameTabletConcurrentWritesTest : public KuduTest {
     KuduSchemaBuilder schema_builder;
     schema_builder.AddColumn("key")->Type(client::KuduColumnSchema::INT64)->
         NotNull()->PrimaryKey();
-    for (auto i = 1; i < kNumColumns; ++i) {
+    for (auto i = 1; i < num_columns_; ++i) {
       schema_builder.AddColumn(Substitute("col$0", i))->
           Type(client::KuduColumnSchema::STRING)->NotNull();
     }
     ASSERT_OK(schema_builder.Build(&schema_));
   }
 
+  Status Prepare(int num_tablet_servers) {
+    InternalMiniClusterOptions opts;
+    opts.num_tablet_servers = num_tablet_servers;
+    cluster_.reset(new InternalMiniCluster(env_, opts));
+    RETURN_NOT_OK(cluster_->Start());
+    return CreateTestTable(num_tablet_servers);
+  }
+
+  void Run(size_t num_inserter_threads,
+           const MonoDelta& runtime,
+           vector<thread>* threads,
+           vector<size_t>* counters) {
+    ASSERT_TRUE(threads->empty());
+    threads->reserve(num_inserter_threads);
+    ASSERT_EQ(num_inserter_threads, counters->size());
+    vector<Status> statuses(num_inserter_threads);
+    for (auto idx = 0; idx < num_inserter_threads; ++idx) {
+      threads->emplace_back(&SameTabletConcurrentWritesBaseTest::InserterTask,
+                            this,
+                            idx,
+                            num_inserter_threads,
+                            &(statuses[idx]),
+                            &((*counters)[idx]));
+    }
+
+    SleepFor(runtime);
+
+    do_run_ = false;
+    for_each(threads->begin(), threads->end(), [](thread& t) { t.join(); });
+
+    for (const auto& s : statuses) {
+      EXPECT_OK(s);
+    }
+  }
+
   Status CreateTestTable(int num_replicas) {
     client::sp::shared_ptr<KuduClient> client;
     RETURN_NOT_OK(cluster_->CreateClient(nullptr, &client));
@@ -136,7 +172,10 @@ class SameTabletConcurrentWritesTest : public KuduTest {
         .Create();
   }
 
-  void InserterTask(size_t task_idx, Status* result_status, size_t* counter) {
+  void InserterTask(size_t task_idx,
+                    int64_t key_increment,
+                    Status* result_status,
+                    size_t* counter) {
     using client::sp::shared_ptr;
     static constexpr const char kValPattern[] =
         "$0.00000000000000000000000000000000000000000000000000000000000000000"
@@ -191,13 +230,13 @@ class SameTabletConcurrentWritesTest : public KuduTest {
 
     shared_ptr<KuduTable> table;
     RET_IF_NOT_OK(client->OpenTable(kTableName, &table), session);
-    size_t i = task_idx;
+    int64_t i = task_idx;
     while (do_run_) {
       unique_ptr<KuduInsert> insert(table->NewInsert());
       KuduPartialRow* row = insert->mutable_row();
       RET_IF_NOT_OK(row->SetInt64(0, i), session);
-      i += num_inserter_threads_;
-      for (auto idx = 1; idx < kNumColumns; ++idx) {
+      i += key_increment;
+      for (auto idx = 1; idx < num_columns_; ++idx) {
         RET_IF_NOT_OK(row->SetString(idx, Substitute(kValPattern, i)), session);
       }
       RET_IF_NOT_OK(session->Apply(insert.release()), session);
@@ -211,14 +250,20 @@ class SameTabletConcurrentWritesTest : public KuduTest {
  protected:
   static constexpr const char* const kTableName = "test";
 
-  const int num_inserter_threads_;
-  const MonoDelta runtime_;
+  const int num_columns_;
   InternalMiniClusterOptions opts_;
   std::unique_ptr<InternalMiniCluster> cluster_;
   KuduSchema schema_;
   std::atomic<bool> do_run_;
 };
 
+class SameTabletConcurrentWritesTest: public SameTabletConcurrentWritesBaseTest {
+ public:
+  SameTabletConcurrentWritesTest()
+      : SameTabletConcurrentWritesBaseTest(250/*num_columns*/) {
+  }
+};
+
 // Run many inserters into the same tablet when WAL sync calls are slow due to
 // injected latency; report on metrics like spinlock contention cycles and
 // the number of RPC queue's overflows.
@@ -227,12 +272,6 @@ TEST_F(SameTabletConcurrentWritesTest, InsertsOnly) {
 
   constexpr int kNumTabletServers = 3;
 
-  // Custom settings for kudu-master's flags.
-  //
-  // Increase number of columns in the table to make every write operation
-  // heavier.
-  FLAGS_max_num_columns = kNumColumns;
-
   // Custom settings for kudu-tserver's flags.
   //
   // Inject latency into WAL sync operations. This is to allow the point of
@@ -259,29 +298,14 @@ TEST_F(SameTabletConcurrentWritesTest, InsertsOnly) {
   FLAGS_rpc_num_service_threads = 2;
   FLAGS_rpc_service_queue_length = 3;
 
-  InternalMiniClusterOptions opts;
-  opts.num_tablet_servers = kNumTabletServers;
-  cluster_.reset(new InternalMiniCluster(env_, opts));
-  ASSERT_OK(cluster_->Start());
-  ASSERT_OK(CreateTestTable(kNumTabletServers));
+  ASSERT_OK(Prepare(kNumTabletServers));
 
+  const auto runtime = MonoDelta::FromSeconds(FLAGS_runtime_sec);
+  const size_t num_inserter_threads = FLAGS_num_inserter_threads;
   vector<thread> threads;
-  threads.reserve(num_inserter_threads_);
-  vector<Status> statuses(num_inserter_threads_);
-  vector<size_t> counters(num_inserter_threads_, 0);
-  for (auto idx = 0; idx < num_inserter_threads_; idx++) {
-    threads.emplace_back(&SameTabletConcurrentWritesTest::InserterTask, this,
-                         idx, &statuses[idx], &counters[idx]);
-  }
-
-  SleepFor(runtime_);
-
-  do_run_ = false;
-  for_each(threads.begin(), threads.end(), [](thread& t) { t.join(); });
-
-  for (const auto& s : statuses) {
-    EXPECT_OK(s);
-  }
+  threads.reserve(num_inserter_threads);
+  vector<size_t> counters(num_inserter_threads, 0);
+  NO_FATALS(Run(num_inserter_threads, runtime, &threads, &counters));
 
   int64_t num_queue_overflows = 0;
   for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
@@ -326,10 +350,115 @@ TEST_F(SameTabletConcurrentWritesTest, InsertsOnly) {
   const double total = accumulate(counters.begin(), counters.end(), 0UL);
   LOG(INFO) << Substitute(
       "write RPC request rate: $0 req/sec",
-      total / runtime_.ToSeconds());
+      total / runtime.ToSeconds());
   LOG(INFO) << Substitute(
       "total count of RPC queue overflows: $0", num_queue_overflows);
 }
 
+class OpApplyQueueOverloadedTest: public SameTabletConcurrentWritesBaseTest {
+ public:
+  OpApplyQueueOverloadedTest()
+      : SameTabletConcurrentWritesBaseTest(1/*num_columns*/) {
+  }
+};
+
+// The essence of this test scenario is to make sure that a tablet server
+// responds with appropriate error status if rejecting a write operation when
+// its op apply queue is overloaded. A client should automatically retry
+// the rejected operations and eventually succeed with its workload (of course,
+// the latter depends on the configured session timeout).
+//
+// This scenario injects a delay into the apply phase of every write operation
+// and runs many clients which insert data into the same tablet. The overload
+// threshold for the apply queue is set very close to the injected delay,
+// so many write operations are rejected due to the op apply queue being
+// overloaded. Nevertheless, every client eventually succeeds with its workload
+// because the rejected write operations are automatically retried.
+TEST_F(OpApplyQueueOverloadedTest, ClientRetriesOperations) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+
+  constexpr int kNumTabletServers = 3;
+
+  // A couple of settings below might be overriden from the command line by
+  // specifying corresponding flags. This scenarios set reasonable defaults.
+
+  // The op apply threadpool's max_threads is set to base::NumCPUs(), but we
+  // want to have much more inserters to induce the overload of the op apply
+  // queue faster.
+  const int num_cpus = 3 * base::NumCPUs();
+  ASSERT_NE("", SetCommandLineOptionWithMode("num_inserter_threads",
+                                             std::to_string(num_cpus).c_str(),
+                                             google::SET_FLAG_IF_DEFAULT));
+  // The workload should run for long enough time to allow the apply queue
+  // detecting the overload condition. Once the queue overload condition is
+  // detected, tablet server starts rejecting incoming write requests.
+  ASSERT_NE("", SetCommandLineOptionWithMode("runtime_sec",
+                                             "3",
+                                             google::SET_FLAG_IF_DEFAULT));
+
+  // Custom settings for kudu-tserver's flags.
+  //
+  // Inject latency into the op 'apply' phase and make the apply queue's
+  // overload threshold to the same value. This is to make tablet servers
+  // rejecting many write operations due to overload of the op apply queue.
+  FLAGS_tablet_apply_pool_overload_threshold_ms = 100;
+  FLAGS_tablet_inject_latency_on_apply_write_op_ms = 100;
+  // Also, set the RPC queue length very high to avoid queue overflows. The
+  // idea is to put as much pressure pressure on the apply queue as possible.
+  FLAGS_rpc_service_queue_length = 1000;
+
+  // Custom settings for kudu-master's flags.
+  //
+  // Set the RPC queue length very high to avoid queue overflows: the number
+  // of clients working concurrently might be too high for the default settings
+  // of the master's RPC queue length.
+  FLAGS_rpc_service_queue_length = 1000;
+
+  ASSERT_OK(Prepare(kNumTabletServers));
+
+  const auto runtime = MonoDelta::FromSeconds(FLAGS_runtime_sec);
+  const size_t num_inserter_threads = FLAGS_num_inserter_threads;
+  vector<thread> threads;
+  threads.reserve(num_inserter_threads);
+  vector<size_t> counters(num_inserter_threads, 0);
+  NO_FATALS(Run(num_inserter_threads, runtime, &threads, &counters));
+
+  int64_t num_rejections = 0;
+  for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+    const auto& ent = cluster_->mini_tablet_server(i)->server()->metric_entity();
+
+    for (auto* elem : {
+        &METRIC_op_apply_queue_overload_rejections,
+    }) {
+      auto counter = elem->Instantiate(ent);
+      const char* const name = counter->prototype()->name();
+      LOG(INFO) << "Counter value for tserver " << i
+                << " on " << name << ": " << counter->value();
+      num_rejections += counter->value();
+    }
+
+    for (auto* elem : {
+        &METRIC_op_apply_queue_time,
+        &METRIC_op_apply_run_time,
+    }) {
+      auto hist = elem->Instantiate(ent);
+      ostringstream ostr;
+      ostr << "Stats for tserver " << i << " on " << elem->name() << ":" << endl;
+      hist->histogram()->DumpHumanReadable(&ostr);
+      LOG(INFO) << ostr.str();
+    }
+  }
+
+  // Just for information, print out the resulting request rate.
+  const double total = accumulate(counters.begin(), counters.end(), 0UL);
+  LOG(INFO) << Substitute(
+      "write RPC request rate: $0 req/sec",
+      total / runtime.ToSeconds());
+
+  // At least few write operations should be rejected due to overloaded apply
+  // queue.
+  ASSERT_GT(num_rejections, 0);
+}
+
 } // namespace itest
 } // namespace kudu
diff --git a/src/kudu/kserver/kserver.cc b/src/kudu/kserver/kserver.cc
index c645240..d325301 100644
--- a/src/kudu/kserver/kserver.cc
+++ b/src/kudu/kserver/kserver.cc
@@ -36,6 +36,7 @@
 #include "kudu/util/faststring.h"
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 #include "kudu/util/threadpool.h"
 
@@ -55,7 +56,6 @@ static bool ValidateThreadPoolThreadLimit(const char* /*flagname*/, int32_t valu
 }
 DEFINE_validator(server_thread_pool_max_thread_count, &ValidateThreadPoolThreadLimit);
 
-using kudu::server::ServerBaseOptions;
 using std::string;
 using strings::Substitute;
 
@@ -134,22 +134,28 @@ int GetThreadPoolThreadLimit(Env* env) {
 } // anonymous namespace
 
 KuduServer::KuduServer(string name,
-                       const ServerBaseOptions& options,
+                       const KuduServerOptions& opts,
                        const string& metric_namespace)
-    : ServerBase(std::move(name), options, metric_namespace) {
+    : ServerBase(std::move(name), opts, metric_namespace),
+      opts_(opts) {
 }
 
 Status KuduServer::Init() {
   RETURN_NOT_OK(ServerBase::Init());
 
-  ThreadPoolMetrics metrics = {
-      METRIC_op_apply_queue_length.Instantiate(metric_entity_),
-      METRIC_op_apply_queue_time.Instantiate(metric_entity_),
-      METRIC_op_apply_run_time.Instantiate(metric_entity_)
-  };
-  RETURN_NOT_OK(ThreadPoolBuilder("apply")
-                .set_metrics(std::move(metrics))
-                .Build(&tablet_apply_pool_));
+  {
+    ThreadPoolMetrics metrics{
+        METRIC_op_apply_queue_length.Instantiate(metric_entity_),
+        METRIC_op_apply_queue_time.Instantiate(metric_entity_),
+        METRIC_op_apply_run_time.Instantiate(metric_entity_),
+    };
+    ThreadPoolBuilder builder("apply");
+    builder.set_metrics(std::move(metrics));
+    if (opts_.apply_queue_overload_threshold.Initialized()) {
+      builder.set_queue_overload_threshold(opts_.apply_queue_overload_threshold);
+    }
+    RETURN_NOT_OK(builder.Build(&tablet_apply_pool_));
+  }
 
   // These pools are shared by all replicas hosted by this server, and thus
   // are capped at a portion of the overall per-euid thread resource limit.
diff --git a/src/kudu/kserver/kserver.h b/src/kudu/kserver/kserver.h
index 37a7ffc..ef51e31 100644
--- a/src/kudu/kserver/kserver.h
+++ b/src/kudu/kserver/kserver.h
@@ -23,6 +23,7 @@
 
 #include "kudu/gutil/macros.h"
 #include "kudu/gutil/ref_counted.h"
+#include "kudu/kserver/kserver_options.h"
 #include "kudu/server/server_base.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/threadpool.h"
@@ -30,10 +31,6 @@
 namespace kudu {
 class Status;
 
-namespace server {
-struct ServerBaseOptions;
-}
-
 namespace kserver {
 
 // Kudu server instance.
@@ -46,7 +43,7 @@ class KuduServer : public server::ServerBase {
   // Constructs a new KuduServer instance and performs all no-fail member
   // initializations.
   KuduServer(std::string name,
-             const server::ServerBaseOptions& options,
+             const KuduServerOptions& opts,
              const std::string& metric_namespace);
 
   // Finalizes the initialization of a KuduServer by performing any member
@@ -65,6 +62,9 @@ class KuduServer : public server::ServerBase {
   scoped_refptr<AtomicGauge<int32_t>> num_raft_leaders() { return num_raft_leaders_; }
 
  private:
+  // The options that this server was created with.
+  const KuduServerOptions opts_;
+
   // Thread pool for preparing ops, shared between all tablets.
   std::unique_ptr<ThreadPool> tablet_prepare_pool_;
 
diff --git a/src/kudu/master/master_options.h b/src/kudu/kserver/kserver_options.h
similarity index 63%
copy from src/kudu/master/master_options.h
copy to src/kudu/kserver/kserver_options.h
index e1a36a1..b831f59 100644
--- a/src/kudu/master/master_options.h
+++ b/src/kudu/kserver/kserver_options.h
@@ -14,28 +14,23 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_MASTER_MASTER_OPTIONS_H
-#define KUDU_MASTER_MASTER_OPTIONS_H
+#pragma once
 
-#include <vector>
+#include <cstdint>
+#include <memory>
+#include <string>
 
 #include "kudu/server/server_base_options.h"
-#include "kudu/util/net/net_util.h"
+#include "kudu/util/monotime.h"
 
 namespace kudu {
-namespace master {
+namespace kserver {
 
-// Options for constructing the master.
-// These are filled in by gflags by default -- see the .cc file for
-// the list of options and corresponding flags.
-struct MasterOptions : public server::ServerBaseOptions {
-  MasterOptions();
+struct KuduServerOptions : public server::ServerBaseOptions {
+  KuduServerOptions() = default;
 
-  std::vector<HostPort> master_addresses;
-
-  bool IsDistributed() const;
+  MonoDelta apply_queue_overload_threshold;
 };
 
-} // namespace master
+} // namespace kserver
 } // namespace kudu
-#endif /* KUDU_MASTER_MASTER_OPTIONS_H */
diff --git a/src/kudu/master/master_options.h b/src/kudu/master/master_options.h
index e1a36a1..8158f52 100644
--- a/src/kudu/master/master_options.h
+++ b/src/kudu/master/master_options.h
@@ -14,12 +14,11 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_MASTER_MASTER_OPTIONS_H
-#define KUDU_MASTER_MASTER_OPTIONS_H
+#pragma once
 
 #include <vector>
 
-#include "kudu/server/server_base_options.h"
+#include "kudu/kserver/kserver_options.h"
 #include "kudu/util/net/net_util.h"
 
 namespace kudu {
@@ -28,7 +27,7 @@ namespace master {
 // Options for constructing the master.
 // These are filled in by gflags by default -- see the .cc file for
 // the list of options and corresponding flags.
-struct MasterOptions : public server::ServerBaseOptions {
+struct MasterOptions : public kserver::KuduServerOptions {
   MasterOptions();
 
   std::vector<HostPort> master_addresses;
@@ -38,4 +37,3 @@ struct MasterOptions : public server::ServerBaseOptions {
 
 } // namespace master
 } // namespace kudu
-#endif /* KUDU_MASTER_MASTER_OPTIONS_H */
diff --git a/src/kudu/tablet/tablet_replica.cc b/src/kudu/tablet/tablet_replica.cc
index a41609e..cfad381 100644
--- a/src/kudu/tablet/tablet_replica.cc
+++ b/src/kudu/tablet/tablet_replica.cc
@@ -100,29 +100,26 @@ METRIC_DEFINE_gauge_uint64(tablet, live_row_count, "Tablet Live Row Count",
                            "Number of live rows in this tablet, excludes deleted rows.",
                            kudu::MetricLevel::kInfo);
 
-namespace kudu {
-namespace tablet {
-
-using consensus::ConsensusBootstrapInfo;
-using consensus::ConsensusOptions;
-using consensus::ConsensusRound;
-using consensus::MarkDirtyCallback;
-using consensus::OpId;
-using consensus::PeerProxyFactory;
-using consensus::RaftConfigPB;
-using consensus::RaftPeerPB;
-using consensus::RaftConsensus;
-using consensus::RpcPeerProxyFactory;
-using consensus::ServerContext;
-using consensus::TimeManager;
-using consensus::ALTER_SCHEMA_OP;
-using consensus::PARTICIPANT_OP;
-using consensus::WRITE_OP;
-using log::Log;
-using log::LogAnchorRegistry;
-using pb_util::SecureDebugString;
-using rpc::Messenger;
-using rpc::ResultTracker;
+using kudu::consensus::ALTER_SCHEMA_OP;
+using kudu::consensus::ConsensusBootstrapInfo;
+using kudu::consensus::ConsensusOptions;
+using kudu::consensus::ConsensusRound;
+using kudu::consensus::MarkDirtyCallback;
+using kudu::consensus::OpId;
+using kudu::consensus::PARTICIPANT_OP;
+using kudu::consensus::PeerProxyFactory;
+using kudu::consensus::RaftConfigPB;
+using kudu::consensus::RaftConsensus;
+using kudu::consensus::RaftPeerPB;
+using kudu::consensus::RpcPeerProxyFactory;
+using kudu::consensus::ServerContext;
+using kudu::consensus::TimeManager;
+using kudu::consensus::WRITE_OP;
+using kudu::log::Log;
+using kudu::log::LogAnchorRegistry;
+using kudu::pb_util::SecureDebugString;
+using kudu::rpc::Messenger;
+using kudu::rpc::ResultTracker;
 using std::map;
 using std::shared_ptr;
 using std::string;
@@ -130,6 +127,9 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
+namespace kudu {
+namespace tablet {
+
 TabletReplica::TabletReplica(
     scoped_refptr<TabletMetadata> meta,
     scoped_refptr<consensus::ConsensusMetadataManager> cmeta_manager,
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index dcab726..637095b 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -197,6 +197,7 @@ DECLARE_int32(workload_stats_metric_collection_interval_ms);
 DECLARE_string(block_manager);
 DECLARE_string(env_inject_eio_globs);
 DECLARE_string(env_inject_full_globs);
+DECLARE_uint32(tablet_apply_pool_overload_threshold_ms);
 
 // Declare these metrics prototypes for simpler unit testing of their behavior.
 METRIC_DECLARE_counter(block_manager_total_bytes_read);
@@ -4365,9 +4366,18 @@ class OpApplyQueueTest : public TabletServerTestBase {
  public:
   // Starts the tablet server, override to start it later.
   void SetUp() override {
+
     // Since scenarios of this test make bursts of requests, set the maximum
     // length of the service queue to accomodate many requests.
     FLAGS_rpc_service_queue_length = 1000;
+
+    // Set the overload threshold for the op apply queue. Since the threshold
+    // is two-fold of the injected latency and number of operations submitted
+    // at once much higher than the number of worker threads in the apply thread
+    // pool, the queue will gradually accumulate operations until becoming
+    // overloaded.
+    FLAGS_tablet_apply_pool_overload_threshold_ms = kInjectedLatencyMs;
+
     NO_FATALS(TabletServerTestBase::SetUp());
     NO_FATALS(StartTabletServer(/*num_data_dirs=*/1));
   }
@@ -4376,8 +4386,7 @@ class OpApplyQueueTest : public TabletServerTestBase {
 };
 
 // This is a regression test for KUDU-1587.
-// TODO(aserbin): enable the test once KUDU-1587 is addressed.
-TEST_F(OpApplyQueueTest, DISABLED_ApplyQueueBackpressure) {
+TEST_F(OpApplyQueueTest, ApplyQueueBackpressure) {
   SKIP_IF_SLOW_NOT_ALLOWED();
 
   constexpr size_t kNumCalls = 1000;
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index dd33ecb..3bdf11e 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -52,14 +52,14 @@ namespace kudu {
 namespace tserver {
 
 TabletServer::TabletServer(const TabletServerOptions& opts)
-  : KuduServer("TabletServer", opts, "kudu.tabletserver"),
-    state_(kStopped),
-    quiescing_(false),
-    fail_heartbeats_for_tests_(false),
-    opts_(opts),
-    tablet_manager_(new TSTabletManager(this)),
-    scanner_manager_(new ScannerManager(metric_entity())),
-    path_handlers_(new TabletServerPathHandlers(this)) {
+    : KuduServer("TabletServer", opts, "kudu.tabletserver"),
+      state_(kStopped),
+      quiescing_(false),
+      fail_heartbeats_for_tests_(false),
+      opts_(opts),
+      tablet_manager_(new TSTabletManager(this)),
+      scanner_manager_(new ScannerManager(metric_entity())),
+      path_handlers_(new TabletServerPathHandlers(this)) {
 }
 
 TabletServer::~TabletServer() {
diff --git a/src/kudu/tserver/tablet_server_options.cc b/src/kudu/tserver/tablet_server_options.cc
index 897b824..18c2e40 100644
--- a/src/kudu/tserver/tablet_server_options.cc
+++ b/src/kudu/tserver/tablet_server_options.cc
@@ -27,6 +27,7 @@
 #include "kudu/server/rpc_server.h"
 #include "kudu/tserver/tablet_server.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/monotime.h"
 #include "kudu/util/status.h"
 
 DEFINE_string(tserver_master_addrs, "127.0.0.1:7051",
@@ -36,12 +37,29 @@ DEFINE_string(tserver_master_addrs, "127.0.0.1:7051",
               "using 'rpc_bind_addresses'.");
 TAG_FLAG(tserver_master_addrs, stable);
 
+DEFINE_uint32(tablet_apply_pool_overload_threshold_ms, 0,
+              "The threshold for the queue time of the 'apply' thread pool "
+              "to enter and exit overloaded state. Once the queue stalls and "
+              "its queue times become longer than the specified threshold, it "
+              "enters the overloaded state. Tablet server rejects incoming "
+              "write requests with some probability when its apply queue is "
+              "overloaded. The longer the apply queue stays overloaded, the "
+              "greater the probability of the rejection. In addition, the more "
+              "row operations a write request has, the greater the probablity "
+              "of the rejection. The apply queue exits the overloaded state "
+              "when queue times drop below the specified threshold. Set this "
+              "flag to 0 to disable the behavior described above.");
+TAG_FLAG(tablet_apply_pool_overload_threshold_ms, advanced);
+
 namespace kudu {
 namespace tserver {
 
 TabletServerOptions::TabletServerOptions() {
   rpc_opts.default_port = TabletServer::kDefaultPort;
-
+  if (FLAGS_tablet_apply_pool_overload_threshold_ms > 0) {
+    apply_queue_overload_threshold = MonoDelta::FromMilliseconds(
+        FLAGS_tablet_apply_pool_overload_threshold_ms);
+  }
   Status s = HostPort::ParseStrings(FLAGS_tserver_master_addrs,
                                     master::Master::kDefaultPort,
                                     &master_addresses);
diff --git a/src/kudu/tserver/tablet_server_options.h b/src/kudu/tserver/tablet_server_options.h
index 1ca1389..e823edd 100644
--- a/src/kudu/tserver/tablet_server_options.h
+++ b/src/kudu/tserver/tablet_server_options.h
@@ -14,12 +14,12 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_TSERVER_TABLET_SERVER_OPTIONS_H
-#define KUDU_TSERVER_TABLET_SERVER_OPTIONS_H
+
+#pragma once
 
 #include <vector>
 
-#include "kudu/server/server_base_options.h"
+#include "kudu/kserver/kserver_options.h"
 #include "kudu/util/net/net_util.h"
 
 namespace kudu {
@@ -31,7 +31,7 @@ namespace tserver {
 //
 // This allows tests to easily start miniclusters with different
 // tablet servers having different options.
-struct TabletServerOptions : public kudu::server::ServerBaseOptions {
+struct TabletServerOptions : public kserver::KuduServerOptions {
   TabletServerOptions();
 
   std::vector<HostPort> master_addresses;
@@ -39,4 +39,3 @@ struct TabletServerOptions : public kudu::server::ServerBaseOptions {
 
 } // namespace tserver
 } // namespace kudu
-#endif /* KUDU_TSERVER_TABLET_SERVER_OPTIONS_H */
diff --git a/src/kudu/tserver/tablet_server_runner.cc b/src/kudu/tserver/tablet_server_runner.cc
index b4a7205..868e4a1 100644
--- a/src/kudu/tserver/tablet_server_runner.cc
+++ b/src/kudu/tserver/tablet_server_runner.cc
@@ -33,16 +33,18 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/version_info.h"
 
-using gflags::SET_FLAGS_DEFAULT;
-using std::string;
-using std::to_string;
-
 DEFINE_double(fault_before_start, 0.0,
               "Fake fault flag that always causes a crash on startup. "
               "Used to test the test infrastructure. Should never be set outside of tests.");
 TAG_FLAG(fault_before_start, hidden);
 TAG_FLAG(fault_before_start, unsafe);
 
+DECLARE_uint32(tablet_apply_pool_overload_threshold_ms);
+
+using gflags::SET_FLAGS_DEFAULT;
+using std::string;
+using std::to_string;
+
 namespace kudu {
 namespace tserver {
 
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index dc99eca..87ce09d 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -104,9 +104,11 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/pb_util.h"
 #include "kudu/util/process_memory.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
 #include "kudu/util/stopwatch.h"
+#include "kudu/util/threadpool.h"
 #include "kudu/util/trace.h"
 #include "kudu/util/trace_metrics.h"
 
@@ -169,6 +171,14 @@ DECLARE_bool(raft_prepare_replacement_before_eviction);
 DECLARE_int32(memory_limit_warn_threshold_percentage);
 DECLARE_int32(tablet_history_max_age_sec);
 
+METRIC_DEFINE_counter(
+    server,
+    op_apply_queue_overload_rejections,
+    "Number of Rejected Write Requests Due to Queue Overloaded Error",
+    kudu::MetricUnit::kRequests,
+    "Number of rejected write requests due to overloaded op apply queue",
+    kudu::MetricLevel::kWarn);
+
 using google::protobuf::RepeatedPtrField;
 using kudu::consensus::BulkChangeConfigRequestPB;
 using kudu::consensus::ChangeConfigRequestPB;
@@ -1050,8 +1060,11 @@ static size_t GetMaxBatchSizeBytesHint(const ScanRequestPB* req) {
 }
 
 TabletServiceImpl::TabletServiceImpl(TabletServer* server)
-  : TabletServerServiceIf(server->metric_entity(), server->result_tracker()),
-    server_(server) {
+    : TabletServerServiceIf(server->metric_entity(), server->result_tracker()),
+      server_(server),
+      rng_(GetRandomSeed32()) {
+  num_op_apply_queue_rejections_ = server_->metric_entity()->FindOrCreateCounter(
+      &METRIC_op_apply_queue_overload_rejections);
 }
 
 bool TabletServiceImpl::AuthorizeClientOrServiceUser(const google::protobuf::Message* /*req*/,
@@ -1486,6 +1499,30 @@ void TabletServiceImpl::Write(const WriteRequestPB* req,
     return;
   }
 
+  // If the apply queue is overloaded, the write request might be rejected.
+  // The longer the queue was in overloaded state, the higher the probability
+  // of rejecting the request.
+  MonoDelta queue_otime;
+  MonoDelta threshold;
+  if (server_->tablet_apply_pool()->QueueOverloaded(&queue_otime, &threshold)) {
+    DCHECK(threshold.Initialized());
+    DCHECK_GT(threshold.ToMilliseconds(), 0);
+    auto overload_threshold_ms = threshold.ToMilliseconds();
+    // The longer the queue has been in the overloaded state, the higher the
+    // probability of an op to be rejected.
+    auto time_factor = queue_otime.ToMilliseconds() / overload_threshold_ms + 1;
+    if (!rng_.OneIn(time_factor * time_factor + 1)) {
+      static const Status kStatus = Status::ServiceUnavailable(
+          "op apply queue is overloaded");
+      num_op_apply_queue_rejections_->Increment();
+      SetupErrorAndRespond(resp->mutable_error(),
+                           kStatus,
+                           TabletServerErrorPB::THROTTLED,
+                           context);
+      return;
+    }
+  }
+
   unique_ptr<WriteOpState> op_state(new WriteOpState(
       replica.get(),
       req,
diff --git a/src/kudu/tserver/tablet_service.h b/src/kudu/tserver/tablet_service.h
index f12f3aa..2c96835 100644
--- a/src/kudu/tserver/tablet_service.h
+++ b/src/kudu/tserver/tablet_service.h
@@ -22,9 +22,12 @@
 
 #include "kudu/consensus/consensus.service.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/tserver/tserver.pb.h"
 #include "kudu/tserver/tserver_admin.service.h"
 #include "kudu/tserver/tserver_service.service.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/random.h"
 
 namespace boost {
 template <class T> class optional;
@@ -192,6 +195,14 @@ class TabletServiceImpl : public TabletServerServiceIf {
                                 Timestamp* snap_timestamp);
 
   TabletServer* server_;
+
+  // Random generator used to make a decision on the admission of write
+  // operations when the apply queue is overloaded.
+  ThreadSafeRandom rng_;
+
+  // Counter to track number of rejected write requests while op apply queue
+  // was overloaded.
+  scoped_refptr<Counter> num_op_apply_queue_rejections_;
 };
 
 class TabletServiceAdminImpl : public TabletServerAdminServiceIf {