You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/09/02 01:48:13 UTC

[4/4] kudu git commit: KUDU-456 Implement AUTO_FLUSH_BACKGROUND flush mode

KUDU-456 Implement AUTO_FLUSH_BACKGROUND flush mode

Implemented AUTO_FLUSH_BACKGROUND for the Kudu C++ client library.
In AUTO_FLUSH_BACKGROUND mode,
the KuduSession::Apply() method blocks if total amount of data
for pending operations reaches the buffer size limit.  The limit
on the buffer size can be set by the
KuduSession::SetMutationBufferSpace() method.

The background flush logic checks whether at least one of the
following two conditions is satisfied to determine whether it's time
to flush the accumulated write operations:
  * The over-the-watermark criterion: check whether the total size of
    the freshly submitted (i.e. not-yet-scheduled-for-flush) write
    operations is over the threshold.  The threshold can be set as
    the percentage of the total buffer size using the
    KuduSession::SetMutationBufferFlushWatermark() method.
  * The maximum wait time criterion: check whether the current batch
    of operations has been accumulating for more than the maximum
    wait time.  The maximum wait time can be specified in milliseconds
    using the KuduSession::SetMutationBufferFlushInterval() method.
    A KuduSession object uses RPC messenger's thread pool to monitor
    batches' maximum wait time.

Added functionality to control the maximum number of batchers
per session.  If number of batchers is at the limit already,
KuduSession::Apply() blocks until it's possible to add a new batcher
to accommodate the incoming operation.

Modified behavior of the KuduSession::Flush(): now it waits until all
batchers are flushed before returning.

This change also addresses the following JIRA issue:
  KUDU-1376 KuduSession::SetMutationBufferSpace is not defined

Change-Id: I34905c30b3aad96f53cf7a1822b1cde6d25f33a8
Reviewed-on: http://gerrit.cloudera.org:8080/3952
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/93be1310
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/93be1310
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/93be1310

Branch: refs/heads/master
Commit: 93be1310d227cf05025864654ca3f6713c2ddc2c
Parents: da73a2b
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Jul 14 08:59:49 2016 -0700
Committer: Todd Lipcon <to...@apache.org>
Committed: Fri Sep 2 01:47:26 2016 +0000

----------------------------------------------------------------------
 python/kudu/tests/test_client.py    |   8 +-
 src/kudu/client/batcher.cc          |  57 ++-
 src/kudu/client/batcher.h           |  41 +-
 src/kudu/client/client-internal.h   |   1 +
 src/kudu/client/client-test.cc      | 670 +++++++++++++++++++++++++++++--
 src/kudu/client/client.cc           | 112 ++----
 src/kudu/client/client.h            | 139 ++++++-
 src/kudu/client/error_collector.cc  |   6 +-
 src/kudu/client/error_collector.h   |   4 +-
 src/kudu/client/session-internal.cc | 502 +++++++++++++++++++++--
 src/kudu/client/session-internal.h  | 197 +++++++--
 src/kudu/client/write_op.cc         |  10 +-
 src/kudu/client/write_op.h          |   5 +-
 13 files changed, 1505 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/python/kudu/tests/test_client.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
index ddc973e..49f413a 100644
--- a/python/kudu/tests/test_client.py
+++ b/python/kudu/tests/test_client.py
@@ -183,15 +183,11 @@ class TestClient(KuduTestBase, unittest.TestCase):
     def test_session_flush_modes(self):
         self.client.new_session(flush_mode=kudu.FLUSH_MANUAL)
         self.client.new_session(flush_mode=kudu.FLUSH_AUTO_SYNC)
+        self.client.new_session(flush_mode=kudu.FLUSH_AUTO_BACKGROUND)
 
         self.client.new_session(flush_mode='manual')
         self.client.new_session(flush_mode='sync')
-
-        with self.assertRaises(kudu.KuduNotSupported):
-            self.client.new_session(flush_mode=kudu.FLUSH_AUTO_BACKGROUND)
-
-        with self.assertRaises(kudu.KuduNotSupported):
-            self.client.new_session(flush_mode='background')
+        self.client.new_session(flush_mode='background')
 
         with self.assertRaises(ValueError):
             self.client.new_session(flush_mode='foo')

http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index fbdb2a9..f80ca8a 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -18,8 +18,6 @@
 #include "kudu/client/batcher.h"
 
 #include <algorithm>
-#include <boost/bind.hpp>
-#include <glog/logging.h>
 #include <memory>
 #include <mutex>
 #include <set>
@@ -28,6 +26,9 @@
 #include <utility>
 #include <vector>
 
+#include <boost/bind.hpp>
+#include <glog/logging.h>
+
 #include "kudu/client/callbacks.h"
 #include "kudu/client/client.h"
 #include "kudu/client/client-internal.h"
@@ -260,11 +261,10 @@ WriteRpc::WriteRpc(const scoped_refptr<Batcher>& batcher,
   int ctr = 0;
   RowOperationsPBEncoder enc(requested);
   for (InFlightOp* op : ops_) {
+#ifndef NDEBUG
     const Partition& partition = op->tablet->partition();
     const PartitionSchema& partition_schema = table()->partition_schema();
     const KuduPartialRow& row = op->write_op->row();
-
-#ifndef NDEBUG
     bool partition_contains_row;
     CHECK(partition_schema.PartitionContainsRow(partition, row, &partition_contains_row).ok());
     CHECK(partition_contains_row)
@@ -378,19 +378,18 @@ RetriableRpcStatus WriteRpc::AnalyzeResponse(const Status& rpc_cb_status) {
 }
 
 Batcher::Batcher(KuduClient* client,
-                 ErrorCollector* error_collector,
-                 const sp::shared_ptr<KuduSession>& session,
+                 scoped_refptr<ErrorCollector> error_collector,
+                 sp::weak_ptr<KuduSession> session,
                  kudu::client::KuduSession::ExternalConsistencyMode consistency_mode)
   : state_(kGatheringOps),
     client_(client),
-    weak_session_(session),
+    weak_session_(std::move(session)),
     consistency_mode_(consistency_mode),
-    error_collector_(error_collector),
+    error_collector_(std::move(error_collector)),
     had_errors_(false),
-    flush_callback_(NULL),
+    flush_callback_(nullptr),
     next_op_sequence_number_(0),
     outstanding_lookups_(0),
-    max_buffer_size_(7 * 1024 * 1024),
     buffer_bytes_used_(0) {
 }
 
@@ -468,15 +467,14 @@ void Batcher::CheckForFinishedFlush() {
     // a lock inversion deadlock -- the session lock should always
     // come before the batcher lock.
     session->data_->FlushFinished(this);
-  }
 
-  Status s;
-  if (had_errors_) {
+  }
+  if (flush_callback_) {
     // User is responsible for fetching errors from the error collector.
-    s = Status::IOError("Some errors occurred");
+    Status s = had_errors_ ? Status::IOError("Some errors occurred")
+                           : Status::OK();
+    flush_callback_->Run(s);
   }
-
-  flush_callback_->Run(s);
 }
 
 MonoTime Batcher::ComputeDeadlineUnlocked() const {
@@ -512,20 +510,11 @@ void Batcher::FlushAsync(KuduStatusCallback* cb) {
   FlushBuffersIfReady();
 }
 
-Status Batcher::Add(KuduWriteOperation* write_op) {
-  int64_t required_size = write_op->SizeInBuffer();
-  int64_t size_after_adding = buffer_bytes_used_.IncrementBy(required_size);
-  if (PREDICT_FALSE(size_after_adding > max_buffer_size_)) {
-    buffer_bytes_used_.IncrementBy(-required_size);
-    int64_t size_before_adding = size_after_adding - required_size;
-    return Status::Incomplete(Substitute(
-        "not enough space remaining in buffer for op (required $0, "
-        "$1 already used",
-        HumanReadableNumBytes::ToString(required_size),
-        HumanReadableNumBytes::ToString(size_before_adding)));
-  }
-
+int64_t Batcher::GetOperationSizeInBuffer(KuduWriteOperation* write_op) {
+  return write_op->SizeInBuffer();
+}
 
+Status Batcher::Add(KuduWriteOperation* write_op) {
   // As soon as we get the op, start looking up where it belongs,
   // so that when the user calls Flush, we are ready to go.
   gscoped_ptr<InFlightOp> op(new InFlightOp());
@@ -536,7 +525,6 @@ Status Batcher::Add(KuduWriteOperation* write_op) {
 
   AddInFlightOp(op.get());
   VLOG(3) << "Looking up tablet for " << op->write_op->ToString();
-
   // Increment our reference count for the outstanding callback.
   //
   // deadline_ is set in FlushAsync(), after all Add() calls are done, so
@@ -550,6 +538,9 @@ Status Batcher::Add(KuduWriteOperation* write_op) {
       &op->tablet,
       Bind(&Batcher::TabletLookupFinished, this, op.get()));
   IgnoreResult(op.release());
+
+  buffer_bytes_used_.IncrementBy(write_op->SizeInBuffer());
+
   return Status::OK();
 }
 
@@ -560,6 +551,11 @@ void Batcher::AddInFlightOp(InFlightOp* op) {
   CHECK_EQ(state_, kGatheringOps);
   InsertOrDie(&ops_, op);
   op->sequence_number_ = next_op_sequence_number_++;
+
+  // Set the time of the first operation in the batch, if not set yet.
+  if (PREDICT_FALSE(!first_op_time_.Initialized())) {
+    first_op_time_ = MonoTime::Now();
+  }
 }
 
 bool Batcher::IsAbortedUnlocked() const {
@@ -735,7 +731,6 @@ void Batcher::ProcessWriteResponse(const WriteRpc& rpc,
     MarkHadErrors();
   }
 
-
   // Remove all the ops from the "in-flight" list.
   {
     std::lock_guard<simple_spinlock> l(lock_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/batcher.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.h b/src/kudu/client/batcher.h
index f79324b..9851340 100644
--- a/src/kudu/client/batcher.h
+++ b/src/kudu/client/batcher.h
@@ -58,13 +58,16 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
  public:
   // Create a new batcher associated with the given session.
   //
-  // Any errors which come back from operations performed by this batcher are posted to
-  // the provided ErrorCollector.
+  // Any errors which come back from operations performed by this batcher
+  // are posted to the provided ErrorCollector.
   //
-  // Takes a reference on error_collector. Creates a weak_ptr to 'session'.
+  // Takes a reference on error_collector. Takes a weak_ptr to session -- this
+  // is to break circular dependencies (a session keeps a reference to its
+  // current batcher) and make it possible to call notify a session
+  // (if it's around) from a batcher which does its job using other threads.
   Batcher(KuduClient* client,
-          ErrorCollector* error_collector,
-          const client::sp::shared_ptr<KuduSession>& session,
+          scoped_refptr<ErrorCollector> error_collector,
+          client::sp::weak_ptr<KuduSession> session,
           kudu::client::KuduSession::ExternalConsistencyMode consistency_mode);
 
   // Abort the current batch. Any writes that were buffered and not yet sent are
@@ -80,8 +83,6 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
   void SetTimeoutMillis(int millis);
 
   // Add a new operation to the batch. Requires that the batch has not yet been flushed.
-  // TODO: in other flush modes, this may not be the case -- need to
-  // update this when they're implemented.
   //
   // NOTE: If this returns not-OK, does not take ownership of 'write_op'.
   Status Add(KuduWriteOperation* write_op) WARN_UNUSED_RESULT;
@@ -108,6 +109,23 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
     return consistency_mode_;
   }
 
+  // Get time of the first operation in the batch.  If no operations are in
+  // there yet, the returned MonoTime object is not initialized
+  // (i.e. MonoTime::Initialized() returns false).
+  const MonoTime& first_op_time() const {
+    std::lock_guard<simple_spinlock> l(lock_);
+    return first_op_time_;
+  }
+
+  // Return the total size (number of bytes) of all pending write operations
+  // accumulated by the batcher.
+  int64_t buffer_bytes_used() const {
+    return buffer_bytes_used_.Load();
+  }
+
+  // Compute in-buffer size for the given write operation.
+  static int64_t GetOperationSizeInBuffer(KuduWriteOperation* write_op);
+
  private:
   friend class RefCountedThreadSafe<Batcher>;
   friend class WriteRpc;
@@ -166,7 +184,10 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
   kudu::client::KuduSession::ExternalConsistencyMode consistency_mode_;
 
   // Errors are reported into this error collector.
-  scoped_refptr<ErrorCollector> const error_collector_;
+  const scoped_refptr<ErrorCollector> error_collector_;
+
+  // The time when the very first operation was added into the batcher.
+  MonoTime first_op_time_;
 
   // Set to true if there was at least one error from this Batcher.
   // Protected by lock_
@@ -203,10 +224,6 @@ class Batcher : public RefCountedThreadSafe<Batcher> {
   // Note: _not_ protected by lock_!
   Atomic32 outstanding_lookups_;
 
-  // The maximum number of bytes of encoded operations which will be allowed to
-  // be buffered.
-  int64_t max_buffer_size_;
-
   // The number of bytes used in the buffer for pending operations.
   AtomicInt<int64_t> buffer_bytes_used_;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/client-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-internal.h b/src/kudu/client/client-internal.h
index e614b62..19e4088 100644
--- a/src/kudu/client/client-internal.h
+++ b/src/kudu/client/client-internal.h
@@ -238,6 +238,7 @@ class KuduClient::Data {
 
   AtomicInt<uint64_t> latest_observed_timestamp_;
 
+ private:
   DISALLOW_COPY_AND_ASSIGN(Data);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 1af7c59..c2306c9 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -15,18 +15,20 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <gtest/gtest.h>
-#include <gflags/gflags.h>
-#include <glog/stl_logging.h>
-
 #include <algorithm>
+#include <functional>
 #include <map>
 #include <memory>
 #include <set>
 #include <string>
+#include <thread>
 #include <utility>
 #include <vector>
 
+#include <gtest/gtest.h>
+#include <gflags/gflags.h>
+#include <glog/stl_logging.h>
+
 #include "kudu/client/callbacks.h"
 #include "kudu/client/client.h"
 #include "kudu/client/client-internal.h"
@@ -34,6 +36,7 @@
 #include "kudu/client/meta_cache.h"
 #include "kudu/client/row_result.h"
 #include "kudu/client/scanner-internal.h"
+#include "kudu/client/session-internal.h"
 #include "kudu/client/value.h"
 #include "kudu/client/write_op.h"
 #include "kudu/common/partial_row.h"
@@ -84,12 +87,16 @@ METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetMasterRegi
 METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTableLocations);
 METRIC_DECLARE_histogram(handler_latency_kudu_master_MasterService_GetTabletLocations);
 
+using std::bind;
+using std::for_each;
+using std::function;
+using std::map;
 using std::pair;
 using std::set;
 using std::string;
+using std::thread;
 using std::unique_ptr;
 using std::vector;
-using std::map;
 using strings::Substitute;
 
 namespace kudu {
@@ -140,6 +147,14 @@ class ClientTest : public KuduTest {
     ASSERT_NO_FATAL_FAILURE(CreateTable(kTableName, 1, GenerateSplitRows(), {}, &client_table_));
   }
 
+  void TearDown() override {
+    if (cluster_) {
+      cluster_->Shutdown();
+      cluster_.reset();
+    }
+    KuduTest::TearDown();
+  }
+
   // Looks up the remote tablet entry for a given partition key in the meta cache.
   scoped_refptr<internal::RemoteTablet> MetaCacheLookup(KuduTable* table,
                                                         const string& partition_key) {
@@ -160,14 +175,6 @@ class ClientTest : public KuduTest {
     return rows;
   }
 
-  void TearDown() override {
-    if (cluster_) {
-      cluster_->Shutdown();
-      cluster_.reset();
-    }
-    KuduTest::TearDown();
-  }
-
   // Count the rows of a table, checking that the operation succeeds.
   //
   // Must be public to use as a thread closure.
@@ -175,6 +182,44 @@ class ClientTest : public KuduTest {
     CHECK_EQ(CountRowsFromClient(table), expected);
   }
 
+  // Continuously sample the total size of the buffer used by pending operations
+  // of the specified KuduSession object.
+  //
+  // Must be public to use as a thread closure.
+  static void MonitorSessionBufferSize(const KuduSession* session,
+                                       CountDownLatch* run_ctl,
+                                       int64_t* result_max_size) {
+    int64_t max_size = 0;
+    while (!run_ctl->WaitFor(MonoDelta::FromMilliseconds(1))) {
+      int size = session->data_->GetPendingOperationsSizeForTests();
+      if (size > max_size) {
+        max_size = size;
+      }
+    }
+    if (result_max_size != nullptr) {
+      *result_max_size = max_size;
+    }
+  }
+
+  // Continuously sample the total count of batchers of the specified
+  // KuduSession object.
+  //
+  // Must be public to use as a thread closure.
+  static void MonitorSessionBatchersCount(const KuduSession* session,
+                                          CountDownLatch* run_ctl,
+                                          size_t* result_max_count) {
+    size_t max_count = 0;
+    while (!run_ctl->WaitFor(MonoDelta::FromMilliseconds(1))) {
+      size_t count = session->data_->GetBatchersCountForTests();
+      if (count > max_count) {
+        max_count = count;
+      }
+    }
+    if (result_max_count != nullptr) {
+      *result_max_count = max_count;
+    }
+  }
+
  protected:
 
   static const char *kTableName;
@@ -509,6 +554,32 @@ class ClientTest : public KuduTest {
 
   void DoApplyWithoutFlushTest(int sleep_micros);
 
+  // Wait for the operations to be flushed when running the session in
+  // AUTO_FLUSH_BACKGROUND mode. In most scenarios, operations should be
+  // flushed after the maximum wait time. Adding an extra 5x multiplier
+  // due to other OS activity and slow runs under TSAN to avoid flakiness.
+  void WaitForAutoFlushBackground(const shared_ptr<KuduSession>& session,
+                                  int32_t flush_interval_ms) {
+    const int32_t kMaxWaitMs = 5 * flush_interval_ms;
+
+    const MonoTime now(MonoTime::Now());
+    const MonoTime deadline(now + MonoDelta::FromMilliseconds(kMaxWaitMs));
+    for (MonoTime t = now; session->HasPendingOperations() && t < deadline;
+       t = MonoTime::Now()) {
+      SleepFor(MonoDelta::FromMilliseconds(flush_interval_ms / 2));
+    }
+  }
+
+  // Measure how much time a batch of insert operations with
+  // the specified parameters takes to reach the tablet server if running
+  // ApplyInsertToSession() in the specified flush mode.
+  void TimeInsertOpBatch(KuduSession::FlushMode mode,
+                         size_t buffer_size,
+                         size_t run_idx,
+                         size_t run_num,
+                         const vector<size_t>& string_sizes,
+                         CpuTimes* elapsed);
+
   enum WhichServerToKill {
     DEAD_MASTER,
     DEAD_TSERVER
@@ -1143,7 +1214,7 @@ TEST_F(ClientTest, TestNonCoveringRangePartitions) {
     bool overflowed;
     session->GetPendingErrors(&errors, &overflowed);
     EXPECT_FALSE(overflowed);
-    EXPECT_EQ(1, errors.size());
+    ASSERT_EQ(1, errors.size());
     EXPECT_TRUE(errors[0]->status().IsNotFound());
   }
 
@@ -1283,7 +1354,7 @@ TEST_F(ClientTest, TestExclusiveInclusiveRangeBounds) {
     bool overflowed;
     session->GetPendingErrors(&errors, &overflowed);
     EXPECT_FALSE(overflowed);
-    EXPECT_EQ(1, errors.size());
+    ASSERT_EQ(1, errors.size());
     EXPECT_TRUE(errors[0]->status().IsNotFound());
   }
 
@@ -1989,13 +2060,6 @@ TEST_F(ClientTest, TestBatchWithPartialError) {
             "int32 non_null_with_default=12345)", rows[1]);
 }
 
-// Test flushing an empty batch (should be a no-op).
-TEST_F(ClientTest, TestEmptyBatch) {
-  shared_ptr<KuduSession> session = client_->NewSession();
-  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
-  FlushSessionOrDie(session);
-}
-
 void ClientTest::DoTestWriteWithDeadServer(WhichServerToKill which) {
   shared_ptr<KuduSession> session = client_->NewSession();
   session->SetTimeoutMillis(1000);
@@ -2074,21 +2138,23 @@ TEST_F(ClientTest, TestApplyToSessionWithoutFlushing_OpsBuffered) {
   DoApplyWithoutFlushTest(10000);
 }
 
-// Apply a large amount of data without calling Flush(), and ensure
-// that we get an error on Apply() rather than sending a too-large
+// Apply a large amount of data without calling Flush() in MANUAL_FLUSH mode,
+// and ensure that we get an error on Apply() rather than sending a too-large
 // RPC to the server.
 TEST_F(ClientTest, TestApplyTooMuchWithoutFlushing) {
 
   // Applying a bunch of small rows without a flush should result
   // in an error.
   {
+    const size_t kBufferSizeBytes = 1024 * 1024;
     bool got_expected_error = false;
     shared_ptr<KuduSession> session = client_->NewSession();
     ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
-    for (int i = 0; i < 1000000; i++) {
+    ASSERT_OK(session->SetMutationBufferSpace(kBufferSizeBytes));
+    for (int i = 0; i < kBufferSizeBytes; i++) {
       Status s = ApplyInsertToSession(session.get(), client_table_, 1, 1, "x");
       if (s.IsIncomplete()) {
-        ASSERT_STR_CONTAINS(s.ToString(), "not enough space remaining in buffer");
+        ASSERT_STR_CONTAINS(s.ToString(), "not enough mutation buffer space");
         got_expected_error = true;
         break;
       } else {
@@ -2096,16 +2162,548 @@ TEST_F(ClientTest, TestApplyTooMuchWithoutFlushing) {
       }
     }
     ASSERT_TRUE(got_expected_error);
+    EXPECT_TRUE(session->HasPendingOperations());
   }
+}
+
+// Applying a big operation which does not fit into the buffer should
+// return an error with session running in any supported flush mode.
+TEST_F(ClientTest, TestCheckMutationBufferSpaceLimitInEffect) {
+  const size_t kBufferSizeBytes = 256;
+  const string kLongString(kBufferSizeBytes + 1, 'x');
+  const KuduSession::FlushMode kFlushModes[] = {
+    KuduSession::AUTO_FLUSH_BACKGROUND,
+    KuduSession::AUTO_FLUSH_SYNC,
+    KuduSession::MANUAL_FLUSH,
+  };
 
-  // Writing a single very large row should also result in an error.
+  for (auto mode : kFlushModes) {
+    Status s;
+    shared_ptr<KuduSession> session(client_->NewSession());
+    ASSERT_OK(session->SetFlushMode(mode));
+    ASSERT_FALSE(session->HasPendingOperations());
+
+    ASSERT_OK(session->SetMutationBufferSpace(kBufferSizeBytes));
+    s = ApplyInsertToSession(
+          session.get(), client_table_, 0, 1, kLongString.c_str());
+    ASSERT_TRUE(s.IsIncomplete()) << "Got unexpected status: " << s.ToString();
+    EXPECT_FALSE(session->HasPendingOperations());
+    vector<KuduError*> errors;
+    ElementDeleter deleter(&errors);
+    bool overflowed;
+    session->GetPendingErrors(&errors, &overflowed);
+    EXPECT_FALSE(overflowed);
+    ASSERT_EQ(1, errors.size());
+    EXPECT_TRUE(errors[0]->status().IsIncomplete());
+    EXPECT_EQ(s.ToString(), errors[0]->status().ToString());
+  }
+}
+
+// For a KuduSession object, it should be OK to switch between flush modes
+// in the middle if there is no pending operations in the buffer.
+TEST_F(ClientTest, TestSwitchFlushModes) {
+  const size_t kBufferSizeBytes = 256;
+  const string kLongString(kBufferSizeBytes / 2, 'x');
+  const int32_t kFlushIntervalMs = 100;
+
+  shared_ptr<KuduSession> session(client_->NewSession());
+  ASSERT_OK(session->SetMutationBufferSpace(kBufferSizeBytes));
+  // Start with the AUTO_FLUSH_SYNC mode.
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+  ASSERT_OK(session->SetMutationBufferFlushInterval(kFlushIntervalMs));
+  ASSERT_FALSE(session->HasPendingOperations());
+
+  ASSERT_OK(ApplyInsertToSession(
+      session.get(), client_table_, 0, 1, kLongString.c_str()));
+  // No pending ops: flush should happen synchronously during the Apply() call.
+  ASSERT_FALSE(session->HasPendingOperations());
+
+  // Switch to the MANUAL_FLUSH mode.
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  ASSERT_OK(ApplyInsertToSession(
+      session.get(), client_table_, 1, 2, kLongString.c_str()));
+  ASSERT_TRUE(session->HasPendingOperations());
+  ASSERT_OK(session->Flush());
+  ASSERT_FALSE(session->HasPendingOperations());
+
+  // Switch to the AUTO_FLUSH_BACKGROUND mode.
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+  ASSERT_OK(ApplyInsertToSession(
+      session.get(), client_table_, 2, 3, kLongString.c_str()));
+  ASSERT_OK(session->Flush());
+  // There should be no pending ops: the background flusher should do its job.
+  ASSERT_FALSE(session->HasPendingOperations());
+
+  // Switch back to the MANUAL_FLUSH mode.
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH);
+  ASSERT_OK(ApplyInsertToSession(
+      session.get(), client_table_, 4, 5, kLongString.c_str()));
+  WaitForAutoFlushBackground(session, kFlushIntervalMs);
+  // There should be some pending ops: the automatic background flush
+  // should not be active after switch to the MANUAL_FLUSH mode.
+  ASSERT_TRUE(session->HasPendingOperations());
+  ASSERT_OK(session->Flush()));
+  ASSERT_FALSE(session->HasPendingOperations());
+}
+
+void ClientTest::TimeInsertOpBatch(
+    KuduSession::FlushMode mode,
+    size_t buffer_size,
+    size_t run_idx,
+    size_t run_num,
+    const vector<size_t>& string_sizes,
+    CpuTimes* elapsed) {
+
+  string mode_str = "unknown";
+  switch (mode) {
+    case KuduSession::AUTO_FLUSH_BACKGROUND:
+      mode_str = "AUTO_FLUSH_BACKGROND";
+      break;
+    case KuduSession::AUTO_FLUSH_SYNC:
+      mode_str = "AUTO_FLUSH_SYNC";
+      break;
+    case KuduSession::MANUAL_FLUSH:
+      mode_str = "MANUAL_FLUSH";
+      break;
+  }
+
+  const size_t row_num = string_sizes.size();
+  shared_ptr<KuduSession> session(client_->NewSession());
+  ASSERT_OK(session->SetMutationBufferSpace(buffer_size));
+  ASSERT_OK(session->SetFlushMode(mode));
+  Stopwatch sw(Stopwatch::ALL_THREADS);
+  LOG_TIMING(INFO, "Running in " + mode_str + " mode") {
+    sw.start();
+    for (size_t i = 0; i < row_num; ++i) {
+      const string long_string(string_sizes[i], '0');
+      const size_t idx = run_num * i + run_idx;
+      ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, idx, idx,
+                                     long_string.c_str()));
+    }
+    EXPECT_OK(session->Flush());
+    sw.stop();
+  }
+  ASSERT_EQ(0, session->CountPendingErrors());
+  if (elapsed != nullptr) {
+    *elapsed = sw.elapsed();
+  }
+}
+
+// Test for acceptable values of the maximum number of batchers for KuduSession.
+TEST_F(ClientTest, TestSetSessionMutationBufferMaxNum) {
+  shared_ptr<KuduSession> session(client_->NewSession());
+  // The default for the maximum number of batchers.
+  EXPECT_OK(session->SetMutationBufferMaxNum(2));
+  // Check for minimum acceptable limit for number of batchers.
+  EXPECT_OK(session->SetMutationBufferMaxNum(1));
+  // Unlimited number of batchers.
+  EXPECT_OK(session->SetMutationBufferMaxNum(0));
+  // Non-default acceptable number of batchers.
+  EXPECT_OK(session->SetMutationBufferMaxNum(8));
+  // Check it's impossible to update maximum number of batchers if there are
+  // pending operations.
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 0, 0, "x"));
+  ASSERT_EQ(1, session->CountBufferedOperations());
+  ASSERT_EQ(0, session->CountPendingErrors());
+  ASSERT_TRUE(session->HasPendingOperations());
+  Status s = session->SetMutationBufferMaxNum(3);
+  ASSERT_TRUE(s.IsIllegalState());
+  ASSERT_STR_CONTAINS(s.ToString(),
+                      "Cannot change the limit on maximum number of batchers");
+  ASSERT_OK(session->Flush());
+  ASSERT_EQ(0, session->CountPendingErrors());
+  ASSERT_FALSE(session->HasPendingOperations());
+}
+
+// Check that call to Flush()/FlushAsync() is safe if there isn't current
+// batcher for the session (i.e., no error is expected on calling those).
+TEST_F(ClientTest, TestFlushNoCurrentBatcher) {
+  shared_ptr<KuduSession> session(client_->NewSession());
+  ASSERT_OK(session->SetMutationBufferMaxNum(1));
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+  ASSERT_FALSE(session->HasPendingOperations());
+  ASSERT_EQ(0, session->CountPendingErrors());
+  Synchronizer sync0;
+  KuduStatusMemberCallback<Synchronizer> scbk0(&sync0, &Synchronizer::StatusCB);
+  // No current batcher since nothing has been applied yet.
+  session->FlushAsync(&scbk0);
+  ASSERT_OK(sync0.Wait());
+  // The same with the synchronous flush.
+  ASSERT_OK(session->Flush());
+  ASSERT_FALSE(session->HasPendingOperations());
+  ASSERT_EQ(0, session->CountPendingErrors());
+
+  ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 0, 0, "0"));
+  ASSERT_TRUE(session->HasPendingOperations());
+  ASSERT_EQ(1, session->CountBufferedOperations());
+  ASSERT_EQ(0, session->CountPendingErrors());
+  // OK, now the current batcher should be at place.
+  ASSERT_OK(session->Flush());
+  ASSERT_FALSE(session->HasPendingOperations());
+  ASSERT_EQ(0, session->CountPendingErrors());
+
+  // Once more: now there isn't current batcher again.
+  ASSERT_FALSE(session->HasPendingOperations());
+  ASSERT_EQ(0, session->CountPendingErrors());
+  Synchronizer sync1;
+  KuduStatusMemberCallback<Synchronizer> scbk1(&sync1, &Synchronizer::StatusCB);
+  session->FlushAsync(&scbk1);
+  ASSERT_OK(sync1.Wait());
+  // The same with the synchronous flush.
+  ASSERT_OK(session->Flush());
+  ASSERT_FALSE(session->HasPendingOperations());
+  ASSERT_EQ(0, session->CountPendingErrors());
+}
+
+// Check that the limit on maximum number of batchers per KuduSession
+// is enforced. KuduSession::Apply() should block if called when the number
+// of batchers is at the limit already.
+TEST_F(ClientTest, TestSessionMutationBufferMaxNum) {
+  const size_t kBufferSizeBytes = 1024;
+  const size_t kBufferMaxLimit = 8;
+  for (size_t limit = 1; limit < kBufferMaxLimit; ++limit) {
+    shared_ptr<KuduSession> session(client_->NewSession());
+    ASSERT_OK(session->SetMutationBufferSpace(kBufferSizeBytes));
+    ASSERT_OK(session->SetMutationBufferMaxNum(limit));
+    ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+
+    size_t monitor_max_batchers_count = 0;
+    CountDownLatch monitor_run_ctl(1);
+    thread monitor(bind(&ClientTest::MonitorSessionBatchersCount, session.get(),
+                        &monitor_run_ctl, &monitor_max_batchers_count));
+
+    // Apply a big number of tiny operations, flushing after each to utilize
+    // maximum possible number of session's batchers.
+    for (size_t i = 0; i < limit * 16; ++i) {
+      ASSERT_OK(ApplyInsertToSession(session.get(), client_table_,
+                                     kBufferMaxLimit * i + limit,
+                                     kBufferMaxLimit * i + limit, "x"));
+      session->FlushAsync(nullptr);
+    }
+    EXPECT_OK(session->Flush());
+
+    monitor_run_ctl.CountDown();
+    monitor.join();
+    EXPECT_GE(limit, monitor_max_batchers_count);
+  }
+}
+
+// A test scenario to compare rate of submission of small write operations
+// in AUTO_FLUSH and AUTO_FLUSH_BACKGROUND mode; all the operations have
+// the same pre-defined size.
+TEST_F(ClientTest, TestFlushModesCompareOpRatesFixedSize) {
+  const size_t kBufferSizeBytes = 32 * 1024;
+  const size_t kRowNum = 4 * 1024;
+
+  vector<size_t> str_sizes(kRowNum, kBufferSizeBytes / 8);
+  CpuTimes t_afb;
+  TimeInsertOpBatch(KuduSession::AUTO_FLUSH_BACKGROUND,
+                    kBufferSizeBytes, 0, 2, str_sizes,
+                    &t_afb);
+  CpuTimes t_afs;
+  TimeInsertOpBatch(KuduSession::AUTO_FLUSH_SYNC,
+                    kBufferSizeBytes, 1, 2, str_sizes,
+                    &t_afs);
+  // AUTO_FLUSH_BACKGROUND should be faster than AUTO_FLUSH_SYNC.
+  EXPECT_GT(t_afs.wall, t_afb.wall);
+}
+
+// A test scenario to compare rate of submission of small write operations
+// in AUTO_FLUSH and AUTO_FLUSH_BACKGROUND mode with operations of random size.
+TEST_F(ClientTest, TestFlushModesCompareOpRatesRandomSize) {
+  const size_t kBufferSizeBytes = 32 * 1024;
+  const size_t kRowNum = 4 * 1024;
+
+  SeedRandom();
+  vector<size_t> str_sizes(kRowNum);
+  for (size_t i = 0; i < kRowNum; ++i) {
+    str_sizes[i] = rand() % (kBufferSizeBytes / 8);
+  }
+  CpuTimes t_afb;
+  TimeInsertOpBatch(KuduSession::AUTO_FLUSH_BACKGROUND,
+                    kBufferSizeBytes, 0, 2, str_sizes,
+                    &t_afb);
+  CpuTimes t_afs;
+  TimeInsertOpBatch(KuduSession::AUTO_FLUSH_SYNC,
+                    kBufferSizeBytes, 1, 2, str_sizes,
+                    &t_afs);
+  // AUTO_FLUSH_BACKGROUND should be faster than AUTO_FLUSH_SYNC.
+  EXPECT_GT(t_afs.wall, t_afb.wall);
+}
+
+// A test scenario for AUTO_FLUSH_BACKGROUND mode:
+// applying a bunch of small rows without a flush should not result in
+// an error, even with low limit on the buffer space. This is because
+// Session::Apply() blocks and waits for buffer space to become available
+// if it cannot add the operation into the buffer right away.
+TEST_F(ClientTest, TestAutoFlushBackgroundSmallOps) {
+  const size_t kBufferSizeBytes = 1024;
+  const size_t kRowNum = kBufferSizeBytes * 10;
+  const int32_t kFlushIntervalMs = 100;
+  shared_ptr<KuduSession> session(client_->NewSession());
+  ASSERT_OK(session->SetMutationBufferSpace(kBufferSizeBytes));
+  ASSERT_OK(session->SetMutationBufferFlushInterval(kFlushIntervalMs));
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+
+  int64_t monitor_max_buffer_size = 0;
+  CountDownLatch monitor_run_ctl(1);
+  thread monitor(bind(&ClientTest::MonitorSessionBufferSize, session.get(),
+                      &monitor_run_ctl, &monitor_max_buffer_size));
+
+  for (size_t i = 0; i < kRowNum; ++i) {
+    ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, i, i, "x"));
+  }
+  EXPECT_OK(session->Flush());
+  EXPECT_EQ(0, session->CountPendingErrors());
+  EXPECT_FALSE(session->HasPendingOperations());
+
+  monitor_run_ctl.CountDown();
+  monitor.join();
+  // Check that the limit has not been overrun.
+  EXPECT_GE(kBufferSizeBytes, monitor_max_buffer_size);
+  // Check that all rows have reached the table.
+  EXPECT_EQ(kRowNum, CountRowsFromClient(client_table_.get()));
+}
+
+// A test scenario for AUTO_FLUSH_BACKGROUND mode:
+// applying a bunch of rows every one of which is so big in size that
+// a couple of those do not fit into the buffer. This should be OK:
+// Session::Apply() must manage this case as well, blocking on an attempt to add
+// another operation if buffer is about to overflow (no error, though).
+// Once last operation is flushed out of the buffer, Session::Apply() should
+// unblock and work with next operation, and so on.
+TEST_F(ClientTest, TestAutoFlushBackgroundBigOps) {
+  const size_t kBufferSizeBytes = 32 * 1024;
+  const int32_t kFlushIntervalMs = 100;
+  shared_ptr<KuduSession> session(client_->NewSession());
+  ASSERT_OK(session->SetMutationBufferSpace(kBufferSizeBytes));
+  ASSERT_OK(session->SetMutationBufferFlushInterval(kFlushIntervalMs));
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+
+  int64_t monitor_max_buffer_size = 0;
+  CountDownLatch monitor_run_ctl(1);
+  thread monitor(bind(&ClientTest::MonitorSessionBufferSize, session.get(),
+                      &monitor_run_ctl, &monitor_max_buffer_size));
+
+  // Starting from i == 3: this is the lowest i when
+  // ((i - 1) * kBufferSizeBytes / i) has a value greater than
+  // kBufferSizeBytes / 2. We want a pair of operations that both don't fit
+  // into the buffer, but every individual one does.
+  const size_t kRowIdxBeg = 3;
+  const size_t kRowIdxEnd = 256;
+  for (size_t i = kRowIdxBeg; i < kRowIdxEnd; ++i) {
+    const string long_string((i - 1) * kBufferSizeBytes / i, 'x');
+    ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, i, i,
+                                   long_string.c_str()));
+  }
+  EXPECT_OK(session->Flush());
+  EXPECT_EQ(0, session->CountPendingErrors());
+  EXPECT_FALSE(session->HasPendingOperations());
+
+  monitor_run_ctl.CountDown();
+  monitor.join();
+  EXPECT_GE(kBufferSizeBytes, monitor_max_buffer_size);
+  // Check that all rows have reached the table.
+  EXPECT_EQ(kRowIdxEnd - kRowIdxBeg, CountRowsFromClient(client_table_.get()));
+}
+
+// A test scenario for AUTO_FLUSH_BACKGROUND mode:
+// interleave write operations of random lenth.
+// Every single operation fits into the buffer; no error is expected.
+TEST_F(ClientTest, TestAutoFlushBackgroundRandomOps) {
+  const size_t kBufferSizeBytes = 1024;
+  const size_t kRowNum = 512;
+  const int32_t kFlushIntervalMs = 1000;
+
+  shared_ptr<KuduSession> session(client_->NewSession());
+  ASSERT_OK(session->SetMutationBufferSpace(kBufferSizeBytes));
+  ASSERT_OK(session->SetMutationBufferFlushInterval(kFlushIntervalMs));
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+
+  SeedRandom();
+  int64_t monitor_max_buffer_size = 0;
+  CountDownLatch monitor_run_ctl(1);
+  thread monitor(bind(&ClientTest::MonitorSessionBufferSize, session.get(),
+                      &monitor_run_ctl, &monitor_max_buffer_size));
+
+  for (size_t i = 0; i < kRowNum; ++i) {
+    // Every operation takes less than 2/3 of the buffer space, so no
+    // error on 'operation size is bigger than the limit' is expected.
+    const string long_string(rand() % (2 * kBufferSizeBytes / 3), 'x');
+    ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, i, i,
+                                   long_string.c_str()));
+  }
+  EXPECT_OK(session->Flush());
+  EXPECT_EQ(0, session->CountPendingErrors());
+  EXPECT_FALSE(session->HasPendingOperations());
+
+  monitor_run_ctl.CountDown();
+  monitor.join();
+  EXPECT_GE(kBufferSizeBytes, monitor_max_buffer_size);
+  // Check that all rows have reached the table.
+  EXPECT_EQ(kRowNum, CountRowsFromClient(client_table_.get()));
+}
+
+// Test that in AUTO_FLUSH_BACKGROUND mode even a small amount of tiny
+// operations are put into the queue and flushed after some time.
+// Since the buffer size limit is relatively huge compared with the total size
+// of operations getting into the buffer, the high-watermark flush criteria
+// is not going to be met and the timer expiration criteria starts playing here.
+TEST_F(ClientTest, TestAutoFlushBackgroundFlushTimeout) {
+  const int32_t kFlushIntervalMs = 200;
+  shared_ptr<KuduSession> session(client_->NewSession());
+  ASSERT_OK(session->SetMutationBufferSpace(1 * 1024 * 1024));
+  ASSERT_OK(session->SetMutationBufferFlushInterval(kFlushIntervalMs));
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+  ASSERT_OK(ApplyInsertToSession(session.get(), client_table_, 0, 0, "x"));
+  ASSERT_TRUE(session->HasPendingOperations());
+  WaitForAutoFlushBackground(session, kFlushIntervalMs);
+  EXPECT_EQ(0, session->CountPendingErrors());
+  EXPECT_FALSE(session->HasPendingOperations());
+  // Check that all rows have reached the table.
+  EXPECT_EQ(1, CountRowsFromClient(client_table_.get()));
+}
+
+// Test that in AUTO_FLUSH_BACKGROUND mode applying two big operations in a row
+// works without unnecessary delay in the case illustrated by the diagram below.
+//
+//                                          +-------------------+
+//                                          |                   |
+//                                          | Data of the next  |
+//                   +---buffer_limit----+  | operation to add. |
+// flush watermark ->|                   |  |                   |
+//                   +-------------------+  +---------0---------+
+//                   |                   |
+//                   | Data of the first |
+//                   | operation.        |
+//                   |                   |
+//                   +---------0---------+
+TEST_F(ClientTest, TestAutoFlushBackgroundPreFlush) {
+  const size_t kBufferSizeBytes = 1024;
+  const size_t kStringLenBytes = 2 * kBufferSizeBytes / 3;
+  const int32 kFlushIntervalMs = 5000;
+
+  shared_ptr<KuduSession> session(client_->NewSession());
+  ASSERT_OK(session->SetMutationBufferSpace(kBufferSizeBytes));
+  ASSERT_OK(session->SetMutationBufferFlushWatermark(0.99)); // 99%
+  ASSERT_OK(session->SetMutationBufferFlushInterval(kFlushIntervalMs));
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+
+  for (size_t i = 0; i < 2; ++i) {
+    unique_ptr<KuduInsert> insert(client_table_->NewInsert());
+    const string str(kStringLenBytes, '0' + i);
+    ASSERT_OK(insert->mutable_row()->SetInt32("key", i));
+    ASSERT_OK(insert->mutable_row()->SetInt32("int_val", i));
+    ASSERT_OK(insert->mutable_row()->SetStringCopy("string_val", str));
+
+    Stopwatch sw(Stopwatch::ALL_THREADS);
+    sw.start();
+    ASSERT_OK(session->Apply(insert.release()));
+    sw.stop();
+
+    ASSERT_TRUE(session->HasPendingOperations());
+
+    // The first Apply() call must not block because the buffer was empty
+    // prior to adding the first operation's data.
+    //
+    // The second Apply() should go through fast because a pre-flush
+    // should happen before adding the data of the second operation even if
+    // the flush watermark hasn't been reached with the first operation's data.
+    // For details on this behavior please see the diagram in the body of the
+    // KuduSession::Data::ApplyWriteOp() method.
+    EXPECT_GT(kFlushIntervalMs / 10,
+              static_cast<int32>(sw.elapsed().wall_millis()));
+  }
+  ASSERT_OK(session->Flush());
+
+  // At this point nothing should be in the buffer.
+  ASSERT_EQ(0, session->CountPendingErrors());
+  ASSERT_FALSE(session->HasPendingOperations());
+  // Check that all rows have reached the table.
+  EXPECT_EQ(2, CountRowsFromClient(client_table_.get()));
+}
+
+// Test that KuduSession::Apply() call blocks in AUTO_FLUSH_BACKGROUND mode
+// if the write operation/mutation buffer does not have enough space
+// to accommodate an incoming write operation.
+//
+// The test scenario uses the combination of watermark level
+// settings and the 'flush by timeout' behavior. The idea is the following:
+//
+//   a. Set the high-watermark level to 100% of the buffer size limit.
+//      This setting and the fact that Apply() does not allow operations' data
+//      to accumulate over the size of the buffer
+//      guarantees that the high-watermark event will not trigger.
+//   b. Set the timeout for the flush to be high enough to distinguish
+//      between occasional delays due to other OS activity and waiting
+//      on the invocation of the blocking Apply() method.
+//   c. Try to add two write operations each of 2/3 of the buffer space in size.
+//      The first Apply() call should succeed instantly, but the second one
+//      should block.
+//   d. Measure how much time it took to return from the second invocation
+//      of the Apply() call: it should be close to the wait timeout.
+//
+TEST_F(ClientTest, TestAutoFlushBackgroundApplyBlocks) {
+  const size_t kBufferSizeBytes = 8 * 1024;
+  const size_t kStringLenBytes = 2 * kBufferSizeBytes / 3;
+  const int32 kFlushIntervalMs = 1000;
+
+  shared_ptr<KuduSession> session(client_->NewSession());
+  ASSERT_OK(session->SetMutationBufferSpace(kBufferSizeBytes));
+  ASSERT_OK(session->SetMutationBufferFlushWatermark(1.0));
+  session->data_->buffer_pre_flush_enabled_ = false;
+  ASSERT_OK(session->SetMutationBufferFlushInterval(kFlushIntervalMs));
+  ASSERT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
+
+  const int32_t wait_timeout_ms = kFlushIntervalMs;
   {
-    string huge_string(10 * 1024 * 1024, 'x');
+    unique_ptr<KuduInsert> insert(client_table_->NewInsert());
+    const string str(kStringLenBytes, '0');
+    ASSERT_OK(insert->mutable_row()->SetInt32("key", 0));
+    ASSERT_OK(insert->mutable_row()->SetInt32("int_val", 0));
+    ASSERT_OK(insert->mutable_row()->SetStringCopy("string_val", str));
+
+    Stopwatch sw(Stopwatch::ALL_THREADS);
+    sw.start();
+    ASSERT_OK(session->Apply(insert.release()));
+    sw.stop();
 
-    shared_ptr<KuduSession> session = client_->NewSession();
-    Status s = ApplyInsertToSession(session.get(), client_table_, 1, 1, huge_string.c_str());
-    ASSERT_TRUE(s.IsIncomplete()) << "got unexpected status: " << s.ToString();
+    ASSERT_TRUE(session->HasPendingOperations());
+
+    // The first Apply() call must not block, so the time spent on calling it
+    // should be at least one order of magnitude less than the periodic flush
+    // interval.
+    EXPECT_GT(wait_timeout_ms / 10, sw.elapsed().wall_millis());
+  }
+  Stopwatch sw(Stopwatch::ALL_THREADS);
+  {
+    unique_ptr<KuduInsert> insert(client_table_->NewInsert());
+    const string str(kStringLenBytes, '1');
+    ASSERT_OK(insert->mutable_row()->SetInt32("key", 1));
+    ASSERT_OK(insert->mutable_row()->SetInt32("int_val", 1));
+    ASSERT_OK(insert->mutable_row()->SetStringCopy("string_val", str));
+
+    // The second Apply() call must block until the flusher pushes already
+    // scheduled operation out of the buffer. The second Apply() call should
+    // unblock as soon as flusher triggers purging buffered operations
+    // by timeout.
+    sw.start();
+    ASSERT_OK(session->Apply(insert.release()));
+    sw.stop();
   }
+  ASSERT_OK(session->Flush());
+
+  // At this point nothing should be in the buffer.
+  ASSERT_EQ(0, session->CountPendingErrors());
+  ASSERT_FALSE(session->HasPendingOperations());
+  // Check that all rows have reached the table.
+  EXPECT_EQ(2, CountRowsFromClient(client_table_.get()));
+
+  // Check that t_diff_ms is close enough to wait_time_ms.
+  EXPECT_GT(wait_timeout_ms + wait_timeout_ms / 2, sw.elapsed().wall_millis());
+  EXPECT_LT(wait_timeout_ms / 2, sw.elapsed().wall_millis());
 }
 
 // Test that update updates and delete deletes with expected use
@@ -3131,8 +3729,8 @@ TEST_F(ClientTest, TestBatchScanConstIterator) {
 
   {
     // Insert a few rows
-    const int ROW_NUM = 2;
-    ASSERT_NO_FATAL_FAILURE(InsertTestRows(client_table_.get(), ROW_NUM));
+    const int kRowNum = 2;
+    ASSERT_NO_FATAL_FAILURE(InsertTestRows(client_table_.get(), kRowNum));
 
     KuduScanner scanner(client_table_.get());
     ASSERT_OK(scanner.Open());
@@ -3142,7 +3740,7 @@ TEST_F(ClientTest, TestBatchScanConstIterator) {
     ASSERT_TRUE(scanner.HasMoreRows());
     ASSERT_OK(scanner.NextBatch(&batch));
     const int ref_count(batch.NumRows());
-    ASSERT_EQ(ROW_NUM, ref_count);
+    ASSERT_EQ(kRowNum, ref_count);
 
     {
       KuduScanBatch::const_iterator it_next = batch.begin();
@@ -3155,7 +3753,7 @@ TEST_F(ClientTest, TestBatchScanConstIterator) {
 
     {
       KuduScanBatch::const_iterator it_end = batch.begin();
-      std::advance(it_end, ROW_NUM);
+      std::advance(it_end, kRowNum);
       ASSERT_TRUE(batch.end() == it_end);
     }
 
@@ -3206,7 +3804,7 @@ TEST_F(ClientTest, TestBatchScanConstIterator) {
       ASSERT_TRUE(other_scanner.HasMoreRows());
       ASSERT_OK(other_scanner.NextBatch(&other_batch));
       const int other_ref_count(other_batch.NumRows());
-      ASSERT_EQ(ROW_NUM, other_ref_count);
+      ASSERT_EQ(kRowNum, other_ref_count);
 
       KuduScanBatch::const_iterator it(batch.begin());
       KuduScanBatch::const_iterator other_it(other_batch.begin());

http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 739c536..dfefff8 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -18,15 +18,15 @@
 #include "kudu/client/client.h"
 
 #include <algorithm>
-#include <boost/bind.hpp>
 #include <memory>
-#include <mutex>
 #include <set>
 #include <string>
 #include <unordered_map>
 #include <utility>
 #include <vector>
 
+#include <boost/bind.hpp>
+
 #include "kudu/client/batcher.h"
 #include "kudu/client/callbacks.h"
 #include "kudu/client/client-internal.h"
@@ -53,10 +53,10 @@
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/substitute.h"
-#include "kudu/master/master.h" // TODO: remove this include - just needed for default port
 #include "kudu/master/master.pb.h"
 #include "kudu/master/master.proxy.h"
 #include "kudu/rpc/messenger.h"
+#include "kudu/rpc/request_tracker.h"
 #include "kudu/util/init.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/net/dns_resolver.h"
@@ -318,8 +318,7 @@ Status KuduClient::ListTabletServers(vector<KuduTabletServer*>* tablet_servers)
     HostPort hp;
     RETURN_NOT_OK(HostPortFromPB(e.registration().rpc_addresses(0), &hp));
     unique_ptr<KuduTabletServer> ts(new KuduTabletServer);
-    ts->data_ = new KuduTabletServer::Data(e.instance_id().permanent_uuid(),
-                                           hp);
+    ts->data_ = new KuduTabletServer::Data(e.instance_id().permanent_uuid(), hp);
     tablet_servers->push_back(ts.release());
   }
   return Status::OK();
@@ -700,7 +699,7 @@ KuduError::~KuduError() {
 ////////////////////////////////////////////////////////////
 
 KuduSession::KuduSession(const shared_ptr<KuduClient>& client)
-  : data_(new KuduSession::Data(client)) {
+  : data_(new KuduSession::Data(client, client->data_->messenger_)) {
 }
 
 KuduSession::~KuduSession() {
@@ -713,110 +712,67 @@ Status KuduSession::Close() {
 }
 
 Status KuduSession::SetFlushMode(FlushMode m) {
-  if (m == AUTO_FLUSH_BACKGROUND) {
-    return Status::NotSupported("AUTO_FLUSH_BACKGROUND has not been implemented in the"
-        " c++ client (see KUDU-456).");
-  }
-  if (data_->batcher_->HasPendingOperations()) {
-    // TODO: there may be a more reasonable behavior here.
-    return Status::IllegalState("Cannot change flush mode when writes are buffered");
-  }
   if (!tight_enum_test<FlushMode>(m)) {
     // Be paranoid in client code.
     return Status::InvalidArgument("Bad flush mode");
   }
-
-  data_->flush_mode_ = m;
-  return Status::OK();
+  return data_->SetFlushMode(m, shared_from_this());
 }
 
 Status KuduSession::SetExternalConsistencyMode(ExternalConsistencyMode m) {
-  if (data_->batcher_->HasPendingOperations()) {
-    // TODO: there may be a more reasonable behavior here.
-    return Status::IllegalState("Cannot change external consistency mode when writes are "
-        "buffered");
-  }
   if (!tight_enum_test<ExternalConsistencyMode>(m)) {
     // Be paranoid in client code.
     return Status::InvalidArgument("Bad external consistency mode");
   }
+  return data_->SetExternalConsistencyMode(m);
+}
 
-  data_->external_consistency_mode_ = m;
-  return Status::OK();
+Status KuduSession::SetMutationBufferSpace(size_t size) {
+  return data_->SetBufferBytesLimit(size);
+}
+
+Status KuduSession::SetMutationBufferFlushWatermark(double watermark_pct) {
+  return data_->SetBufferFlushWatermark(
+      static_cast<int32_t>(100.0 * watermark_pct));
+}
+
+Status KuduSession::SetMutationBufferFlushInterval(unsigned int millis) {
+  return data_->SetBufferFlushInterval(millis);
 }
 
-void KuduSession::SetTimeoutMillis(int millis) {
-  CHECK_GE(millis, 0);
-  data_->timeout_ms_ = millis;
-  data_->batcher_->SetTimeoutMillis(millis);
+Status KuduSession::SetMutationBufferMaxNum(unsigned int max_num) {
+  return data_->SetMaxBatchersNum(max_num);
+}
+
+void KuduSession::SetTimeoutMillis(int timeout_ms) {
+  data_->SetTimeoutMillis(timeout_ms);
 }
 
 Status KuduSession::Flush() {
-  Synchronizer s;
-  KuduStatusMemberCallback<Synchronizer> ksmcb(&s, &Synchronizer::StatusCB);
-  FlushAsync(&ksmcb);
-  return s.Wait();
+  return data_->Flush();
 }
 
 void KuduSession::FlushAsync(KuduStatusCallback* user_callback) {
-  CHECK_NE(data_->flush_mode_, AUTO_FLUSH_BACKGROUND) <<
-      "AUTO_FLUSH_BACKGROUND has not been implemented";
-
-  // Swap in a new batcher to start building the next batch.
-  // Save off the old batcher.
-  scoped_refptr<Batcher> old_batcher;
-  {
-    std::lock_guard<simple_spinlock> l(data_->lock_);
-    data_->NewBatcher(shared_from_this(), &old_batcher);
-    InsertOrDie(&data_->flushed_batchers_, old_batcher.get());
-  }
-
-  // Send off any buffered data. Important to do this outside of the lock
-  // since the callback may itself try to take the lock, in the case that
-  // the batch fails "inline" on the same thread.
-  old_batcher->FlushAsync(user_callback);
+  data_->FlushAsync(user_callback);
 }
 
 bool KuduSession::HasPendingOperations() const {
-  std::lock_guard<simple_spinlock> l(data_->lock_);
-  if (data_->batcher_->HasPendingOperations()) {
-    return true;
-  }
-  for (Batcher* b : data_->flushed_batchers_) {
-    if (b->HasPendingOperations()) {
-      return true;
-    }
-  }
-  return false;
+  return data_->HasPendingOperations();
 }
 
 Status KuduSession::Apply(KuduWriteOperation* write_op) {
-  if (!write_op->row().IsKeySet()) {
-    Status status = Status::IllegalState("Key not specified", write_op->ToString());
-    data_->error_collector_->AddError(gscoped_ptr<KuduError>(
-        new KuduError(write_op, status)));
-    return status;
-  }
-
-  Status s = data_->batcher_->Add(write_op);
-  if (!PREDICT_FALSE(s.ok())) {
-    data_->error_collector_->AddError(gscoped_ptr<KuduError>(
-        new KuduError(write_op, s)));
-    return s;
-  }
-
+  RETURN_NOT_OK(data_->ApplyWriteOp(shared_from_this(), write_op));
+  // Thread-safety note: this method should not be called concurrently
+  // with other methods which modify the KuduSession::Data members, so it
+  // should be safe to read KuduSession::Data members without protection.
   if (data_->flush_mode_ == AUTO_FLUSH_SYNC) {
-    return Flush();
+    RETURN_NOT_OK(data_->Flush());
   }
-
   return Status::OK();
 }
 
 int KuduSession::CountBufferedOperations() const {
-  std::lock_guard<simple_spinlock> l(data_->lock_);
-  CHECK_EQ(data_->flush_mode_, MANUAL_FLUSH);
-
-  return data_->batcher_->CountBufferedOperations();
+  return data_->CountBufferedOperations();
 }
 
 int KuduSession::CountPendingErrors() const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 2b33e9b..b808ced 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -405,6 +405,7 @@ class KUDU_EXPORT KuduClient : public sp::enable_shared_from_this<KuduClient> {
   friend class KuduClientBuilder;
   friend class KuduScanner;
   friend class KuduScanTokenBuilder;
+  friend class KuduSession;
   friend class KuduTable;
   friend class KuduTableAlterer;
   friend class KuduTableCreator;
@@ -606,7 +607,7 @@ class KUDU_EXPORT KuduTableCreator {
 
   /// Add a range partition to the table.
   ///
-  /// Multiple range partitions may be added, but they must not overlap.  All
+  /// Multiple range partitions may be added, but they must not overlap. All
   /// range splits specified by @c add_range_partition_split must fall in a
   /// range partition. The lower bound must be less than or equal to the upper
   /// bound.
@@ -1113,24 +1114,24 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
     /// This is the default flush mode.
     AUTO_FLUSH_SYNC,
 
-    /// Apply() calls will return immediately, but the writes will be sent
-    /// in the background, potentially batched together with other writes
-    /// from the same session. If there is not sufficient buffer space,
-    /// then Apply() will block for buffer space to be available.
+    /// Apply() calls will return immediately (unless there is not enough
+    /// buffer space to accommodate the newly added operations), but
+    /// the writes will be sent in the background, potentially batched together
+    /// with other writes from the same session. If there is not sufficient
+    /// buffer space, Apply() blocks for buffer space to become available.
     ///
     /// Because writes are applied in the background, any errors will be stored
     /// in a session-local buffer. Call CountPendingErrors() or
     /// GetPendingErrors() to retrieve them.
     ///
-    /// The Flush() call can be used to block until the buffer is empty.
-    ///
-    /// @warning This is not implemented yet, see KUDU-456
+    /// In this mode, calling the FlushAsync() or Flush() methods causes a flush
+    /// that normally would have happened at some point in the near future
+    /// to happen right now. The Flush() call can be used to block until
+    /// the current batch is sent and the reclaimed space is available
+    /// for new operations.
     ///
     /// @todo Provide an API for the user to specify a callback to do their own
     ///   error reporting.
-    ///
-    /// @todo Specify which threads the background activity runs on
-    ///   (probably the messenger IO threads?).
     AUTO_FLUSH_BACKGROUND,
 
     /// Apply() calls will return immediately, and the writes will not be
@@ -1208,15 +1209,93 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
   /// @li MANUAL_FLUSH
   ///   if the buffer space is exhausted, then write calls will return an error
   ///
+  /// By default, the buffer space is set to 7 MiB (i.e. 7 * 1024 * 1024 bytes).
+  ///
   /// @param [in] size_bytes
   ///   Size of the buffer space to set (number of bytes).
   /// @return Operation result status.
   Status SetMutationBufferSpace(size_t size_bytes) WARN_UNUSED_RESULT;
 
+  /// Set the buffer watermark to trigger flush in AUTO_FLUSH_BACKGROUND mode.
+  ///
+  /// This method sets the watermark for fresh operations in the buffer
+  /// when running in AUTO_FLUSH_BACKGROUND mode: once the specified threshold
+  /// is reached, the session starts sending the accumulated write operations
+  /// to the appropriate tablet servers. By default, the buffer flush watermark
+  /// is to to 80%.
+  ///
+  /// @note This setting is applicable only for AUTO_FLUSH_BACKGROUND sessions.
+  ///   I.e., calling this method in other flush modes is safe, but
+  ///   the parameter has no effect until the session is switched into
+  ///   AUTO_FLUSH_BACKGROUND mode.
+  ///
+  /// @note The buffer contains data for fresh (i.e. newly submitted)
+  ///   operations and also operations which are scheduled for flush or being
+  ///   flushed. The flush watermark determines how much of the buffer space
+  ///   is taken by newly submitted operations. Setting this level to 1.0
+  ///   (i.e. 100%) results in flushing the buffer only when the newly applied
+  ///   operation would overflow the buffer.
+  ///
+  /// @param [in] watermark_pct
+  ///   Watermark level as percentage of the mutation buffer size.
+  /// @return Operation result status.
+  Status SetMutationBufferFlushWatermark(double watermark_pct)
+      WARN_UNUSED_RESULT;
+
+  /// Set the interval for time-based flushing of the mutation buffer.
+  ///
+  /// In some cases, while running in AUTO_FLUSH_BACKGROUND mode, the size
+  /// of the mutation buffer for pending operations and the flush watermark
+  /// for fresh operations may be too high for the rate of incoming data:
+  /// it would take too long to accumulate enough data in the buffer to trigger
+  /// flushing. I.e., it makes sense to flush the accumulated operations
+  /// if the prior flush happened long time ago. This method sets the wait
+  /// interval for the time-based flushing which takes place along with
+  /// the flushing triggered by the over-the-watermark criterion.
+  /// By default, the interval is set to 1000 ms (i.e. 1 second).
+  ///
+  /// @note This setting is applicable only for AUTO_FLUSH_BACKGROUND sessions.
+  ///   I.e., calling this method in other flush modes is safe, but
+  ///   the parameter has no effect until the session is switched into
+  ///   AUTO_FLUSH_BACKGROUND mode.
+  ///
+  /// @param [in] millis
+  ///   The duration of the interval for the time-based flushing,
+  ///   in milliseconds.
+  /// @return Operation result status.
+  Status SetMutationBufferFlushInterval(unsigned int millis) WARN_UNUSED_RESULT;
+
+  /// Set the maximum number of mutation buffers per KuduSession object.
+  ///
+  /// A KuduSession accumulates write operations submitted via the Apply()
+  /// method in mutation buffers. A KuduSession always has at least one
+  /// mutation buffer. The mutation buffer which accumulates new incoming
+  /// operations is called the <em>current mutation buffer</em>.
+  /// The current mutation buffer is flushed either explicitly using
+  /// the KuduSession::Flush() and/or KuduSession::FlushAsync() methods
+  /// or it's done by the KuduSession automatically if running in
+  /// AUTO_FLUSH_BACKGROUND mode. After flushing the current mutation buffer,
+  /// a new buffer is created upon calling KuduSession::Apply(),
+  /// provided the limit is not exceeded. A call to KuduSession::Apply() blocks
+  /// if it's at the maximum number of buffers allowed; the call unblocks
+  /// as soon as one of the pending batchers finished flushing and a new batcher
+  /// can be created.
+  ///
+  /// The minimum setting for this parameter is 1 (one).
+  /// The default setting for this parameter is 2 (two).
+  ///
+  /// @param [in] max_num
+  ///   The maximum number of mutation buffers per KuduSession object
+  ///   to hold the applied operations. Use @c 0 to set the maximum number
+  ///   of concurrent mutation buffers to unlimited.
+  /// @return Operation result status.
+  Status SetMutationBufferMaxNum(unsigned int max_num) WARN_UNUSED_RESULT;
+
   /// Set the timeout for writes made in this session.
   ///
   /// @param [in] millis
-  ///   Timeout to set in milliseconds; should be greater than 0.
+  ///   Timeout to set in milliseconds; should be greater or equal to 0.
+  ///   If the parameter value is less than 0, it's implicitly set to 0.
   void SetTimeoutMillis(int millis);
 
   /// @todo
@@ -1235,6 +1314,13 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
   /// the write_op is malformed, the write_op is stored in the session's error
   /// collector which may be retrieved at any time.
   ///
+  /// A KuduSession accumulates write operations submitted via the Apply()
+  /// method in mutation buffers. A KuduSession always has at least one
+  /// mutation buffer. In any flush mode, this call may block if the maximum
+  /// number of mutation buffers per session is reached
+  /// (use KuduSession::SetMutationBufferMaxNum() to set the limit
+  /// on maximum number of batchers).
+  ///
   /// @param [in] write_op
   ///   Operation to apply. This method transfers the write_op's ownership
   ///   to the KuduSession.
@@ -1260,8 +1346,13 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
 
   /// Flush any pending writes.
   ///
-  /// In @c AUTO_FLUSH_SYNC mode, this has no effect, since every Apply() call
-  /// flushes itself inline.
+  /// This method initiates flushing of the current batch of buffered
+  /// write operations, if any, and then awaits for completion of all
+  /// pending operations of the session. I.e., after successful return
+  /// from this method no pending operations should be left in the session.
+  ///
+  /// In @c AUTO_FLUSH_SYNC mode, calling this method has no effect,
+  /// since every Apply() call flushes itself inline.
   ///
   /// @return Operation result status. In particular, returns a non-OK status
   ///   if there are any pending errors after the rows have been flushed.
@@ -1271,17 +1362,14 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
 
   /// Flush any pending writes asynchronously.
   ///
-  /// This method schedules a background flush of pending operations.
-  /// Provided callback is invoked upon completion of the flush.
+  /// This method schedules a background flush of the latest batch of buffered
+  /// write operations. Provided callback is invoked upon the flush completion
+  /// of the latest batch of buffered write operations.
   /// If there were errors while flushing the operations, corresponding
   /// 'not OK' status is passed as a parameter for the callback invocation.
   /// Callers should then use GetPendingErrors() to determine which specific
   /// operations failed.
   ///
-  /// @param [in] cb
-  ///   Callback to call upon flush completion. The @c cb must remain valid
-  ///   until it is invoked.
-  ///
   /// In the case that the async version of this method is used, then
   /// the callback will be called upon completion of the operations which
   /// were buffered since the last flush. In other words, in the following
@@ -1293,7 +1381,9 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
   ///   session->FlushAsync(callback_2);
   /// @endcode
   /// ... @c callback_2 will be triggered once @c b has been inserted,
-  /// regardless of whether @c a has completed or not.
+  /// regardless of whether @c a has completed or not. That means there might be
+  /// pending operations left in prior batches even after the the callback
+  /// has been invoked to report on the flush status of the latest batch.
   ///
   /// @note This also means that, if FlushAsync is called twice in succession,
   /// with no intervening operations, the second flush will return immediately.
@@ -1306,6 +1396,10 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
   /// Note that, as in all other async functions in Kudu, the callback
   /// may be called either from an IO thread or the same thread which calls
   /// FlushAsync. The callback should not block.
+  ///
+  /// @param [in] cb
+  ///   Callback to call upon flush completion. The @c cb must remain valid
+  ///   until it is invoked.
   void FlushAsync(KuduStatusCallback* cb);
 
   /// @return Status of the session closure. In particular, an error is returned
@@ -1363,6 +1457,9 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
 
   friend class KuduClient;
   friend class internal::Batcher;
+  friend class ClientTest;
+  FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
+
   explicit KuduSession(const sp::shared_ptr<KuduClient>& client);
 
   // Owned.

http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/error_collector.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/error_collector.cc b/src/kudu/client/error_collector.cc
index 6215b8a..02206cc 100644
--- a/src/kudu/client/error_collector.cc
+++ b/src/kudu/client/error_collector.cc
@@ -27,9 +27,6 @@ namespace kudu {
 namespace client {
 namespace internal {
 
-ErrorCollector::ErrorCollector() {
-}
-
 ErrorCollector::~ErrorCollector() {
   STLDeleteElements(&errors_);
 }
@@ -39,7 +36,7 @@ void ErrorCollector::AddError(gscoped_ptr<KuduError> error) {
   errors_.push_back(error.release());
 }
 
-int ErrorCollector::CountErrors() const {
+size_t ErrorCollector::CountErrors() const {
   std::lock_guard<simple_spinlock> l(lock_);
   return errors_.size();
 }
@@ -55,7 +52,6 @@ void ErrorCollector::GetErrors(std::vector<KuduError*>* errors, bool* overflowed
   }
 }
 
-
 } // namespace internal
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/93be1310/src/kudu/client/error_collector.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/error_collector.h b/src/kudu/client/error_collector.h
index c1ef640..42cd0c7 100644
--- a/src/kudu/client/error_collector.h
+++ b/src/kudu/client/error_collector.h
@@ -35,12 +35,12 @@ namespace internal {
 
 class ErrorCollector : public RefCountedThreadSafe<ErrorCollector> {
  public:
-  ErrorCollector();
+  ErrorCollector() = default;
 
   void AddError(gscoped_ptr<KuduError> error);
 
   // See KuduSession for details.
-  int CountErrors() const;
+  size_t CountErrors() const;
 
   // See KuduSession for details.
   void GetErrors(std::vector<KuduError*>* errors, bool* overflowed);