You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/12/02 21:42:48 UTC

[1/2] kudu git commit: [catalog_manager] removed unused variable

Repository: kudu
Updated Branches:
  refs/heads/master b15c0f6e3 -> 60aa54e21


[catalog_manager] removed unused variable

This patch does not contain any functional changes.

Change-Id: I6dfe92e06b6d4381dab4190615293aa47386fc6d
Reviewed-on: http://gerrit.cloudera.org:8080/5331
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7cb65591
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7cb65591
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7cb65591

Branch: refs/heads/master
Commit: 7cb65591e18dd6394272d8cca4d653b49d6dd4b9
Parents: b15c0f6
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Nov 17 23:16:01 2016 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Dec 2 21:37:17 2016 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/7cb65591/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 0f3dff3..3df0d46 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -42,9 +42,7 @@
 #include "kudu/master/catalog_manager.h"
 
 #include <algorithm>
-#include <boost/optional.hpp>
 #include <condition_variable>
-#include <glog/logging.h>
 #include <memory>
 #include <mutex>
 #include <set>
@@ -52,6 +50,9 @@
 #include <utility>
 #include <vector>
 
+#include <boost/optional.hpp>
+#include <glog/logging.h>
+
 #include "kudu/cfile/type_encodings.h"
 #include "kudu/common/key_util.h"
 #include "kudu/common/partial_row.h"
@@ -3561,7 +3562,6 @@ Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB* req,
   vector<scoped_refptr<TabletInfo> > tablets_in_range;
   table->GetTabletsInRange(req, &tablets_in_range);
 
-  ServerRegistrationPB reg;
   for (const scoped_refptr<TabletInfo>& tablet : tablets_in_range) {
     Status s = BuildLocationsForTablet(tablet, resp->add_tablet_locations());
     if (s.ok()) {


[2/2] kudu git commit: KUDU-1752 C++ client error memory should be bounded

Posted by ad...@apache.org.
KUDU-1752 C++ client error memory should be bounded

Added KuduSession::SetErrorBufferSpace() method to set limit on maximum
size of memory consumed by session errors.  To preserve the legacy
behavior, a session is created with no limit on error memory size.

Change-Id: I0b9fe3e83bab2a0b703b685ec7f3bb1db1601e5f
Reviewed-on: http://gerrit.cloudera.org:8080/5308
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/60aa54e2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/60aa54e2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/60aa54e2

Branch: refs/heads/master
Commit: 60aa54e21b715a4e7a1931570c9736a2c7f211c6
Parents: 7cb6559
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Dec 1 10:16:51 2016 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Dec 2 21:42:08 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc     | 55 +++++++++++++++++++++++++++
 src/kudu/client/client-unittest.cc | 43 ++++++++++++++++++++-
 src/kudu/client/client.cc          |  4 ++
 src/kudu/client/client.h           | 33 ++++++++++++++++-
 src/kudu/client/error-internal.cc  |  3 --
 src/kudu/client/error-internal.h   |  2 +-
 src/kudu/client/error_collector.cc | 66 ++++++++++++++++++++++++++++++---
 src/kudu/client/error_collector.h  | 12 +++++-
 src/kudu/client/write_op.h         |  2 +
 9 files changed, 206 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 7827912..4e7aaa5 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -4332,5 +4332,60 @@ TEST_F(ClientTest, TestGetTablet) {
   }
 }
 
+TEST_F(ClientTest, TestErrorCollector) {
+    shared_ptr<KuduSession> session(client_->NewSession());
+    ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+    NO_FATALS(InsertTestRows(client_table_.get(), session.get(), 1, 0));
+    ASSERT_OK(session->Flush());
+
+    // Set the maximum size limit too low even for a single error
+    // and make sure the error collector reports overflow and drops the error.
+    {
+      ASSERT_OK(session->SetErrorBufferSpace(1));
+      // Trying to insert a duplicate row.
+      NO_FATALS(InsertTestRows(client_table_.get(), session.get(), 1, 0));
+      // Expecting an error since tried to insert a duplicate key.
+      ASSERT_TRUE((session->Flush()).IsIOError());
+
+      // It's impossible to update the error buffer size because
+      // the error collector's buffer is overflown and one error has been dropped.
+      EXPECT_TRUE(session->SetErrorBufferSpace(0).IsIllegalState());
+
+      vector<KuduError*> errors;
+      ElementDeleter drop(&errors);
+      bool overflowed;
+      session->GetPendingErrors(&errors, &overflowed);
+      EXPECT_TRUE(errors.empty());
+      EXPECT_TRUE(overflowed);
+    }
+
+    // After calling the GetPendingErrors() and retrieving the errors, it's
+    // possible to update the limit on the error buffer size. Besides, the error
+    // collector should be able to accomodate the duplicate row error
+    // if the error fits the buffer.
+    {
+      ASSERT_OK(session->SetErrorBufferSpace(1024));
+      NO_FATALS(InsertTestRows(client_table_.get(), session.get(), 1, 0));
+      // Expecting an error: tried to insert a duplicate key.
+      ASSERT_TRUE((session->Flush()).IsIOError());
+
+      // It's impossible to update the error buffer size if the error collector
+      // would become overflown.
+      EXPECT_TRUE(session->SetErrorBufferSpace(1).IsIllegalState());
+
+      // It's OK to update the error buffer size because the new limit is high
+      // enough and the error collector hasn't dropped a single error yet.
+      EXPECT_OK(session->SetErrorBufferSpace(2048));
+
+      vector<KuduError*> errors;
+      ElementDeleter drop(&errors);
+      bool overflowed;
+      session->GetPendingErrors(&errors, &overflowed);
+      EXPECT_EQ(1, errors.size());
+      EXPECT_FALSE(overflowed);
+    }
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/client-unittest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-unittest.cc b/src/kudu/client/client-unittest.cc
index 03e87b8..403044c 100644
--- a/src/kudu/client/client-unittest.cc
+++ b/src/kudu/client/client-unittest.cc
@@ -17,16 +17,21 @@
 //
 // Tests for the client which are true unit tests and don't require a cluster, etc.
 
-#include <boost/bind.hpp>
-#include <gtest/gtest.h>
 #include <string>
 #include <vector>
 
+#include <boost/bind.hpp>
+#include <gtest/gtest.h>
+
 #include "kudu/client/client.h"
 #include "kudu/client/client-internal.h"
+#include "kudu/client/error_collector.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/test_macros.h"
 
 using std::string;
 using std::vector;
+using kudu::client::internal::ErrorCollector;
 
 namespace kudu {
 namespace client {
@@ -169,6 +174,40 @@ TEST(ClientUnitTest, TestRetryFunc) {
   ASSERT_LT(counter, 20);
 }
 
+TEST(ClientUnitTest, TestErrorCollector) {
+  {
+    scoped_refptr<ErrorCollector> ec(new ErrorCollector);
+    // Setting the max memory size limit to 'unlimited'.
+    EXPECT_OK(ec->SetMaxMemSize(0));
+    // Setting the max memory size to 1 byte.
+    EXPECT_OK(ec->SetMaxMemSize(1));
+  }
+
+  // Check that the error collector does not allow to set the memory size limit
+  // if at least one error has been dropped since last flush.
+  {
+    scoped_refptr<ErrorCollector> ec(new ErrorCollector);
+    ec->dropped_errors_cnt_ = 1;
+    Status s = ec->SetMaxMemSize(0);
+    EXPECT_TRUE(s.IsIllegalState());
+    ASSERT_STR_CONTAINS(s.ToString(),
+                        "cannot set new limit: already dropped some errors");
+  }
+
+  // Check that the error collector does not overflow post-factum on call of the
+  // SetMaxMemSize() method.
+  {
+    const size_t size_bytes = 8;
+    scoped_refptr<ErrorCollector> ec(new ErrorCollector);
+    ec->mem_size_bytes_ = size_bytes;
+    EXPECT_OK(ec->SetMaxMemSize(0));
+    EXPECT_OK(ec->SetMaxMemSize(size_bytes));
+    Status s = ec->SetMaxMemSize(size_bytes - 1);
+    EXPECT_TRUE(s.IsIllegalState());
+    ASSERT_STR_CONTAINS(s.ToString(), "already accumulated errors for");
+  }
+}
+
 } // namespace client
 } // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 974b4be..b634548 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -859,6 +859,10 @@ int KuduSession::CountBufferedOperations() const {
   return data_->CountBufferedOperations();
 }
 
+Status KuduSession::SetErrorBufferSpace(size_t size_bytes) {
+  return data_->error_collector_->SetMaxMemSize(size_bytes);
+}
+
 int KuduSession::CountPendingErrors() const {
   return data_->error_collector_->CountErrors();
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 506a19f..2472254 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -69,6 +69,7 @@ class KuduWriteOperation;
 
 namespace internal {
 class Batcher;
+class ErrorCollector;
 class GetTableSchemaRpc;
 class LookupRpc;
 class MetaCache;
@@ -1123,6 +1124,7 @@ class KUDU_EXPORT KuduError {
   class KUDU_NO_EXPORT Data;
 
   friend class internal::Batcher;
+  friend class internal::ErrorCollector;
   friend class KuduSession;
 
   KuduError(KuduWriteOperation* failed_op, const Status& error);
@@ -1514,10 +1516,39 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
       ATTRIBUTE_DEPRECATED("this method is experimental and will disappear "
                            "in a future release");
 
+  /// Set limit on maximum buffer (memory) size used by this session's errors.
+  /// By default, when a session is created, there is no limit on maximum size.
+  ///
+  /// The session's error buffer contains information on failed write
+  /// operations. In most cases, the error contains the row which would be
+  /// applied as is. If the error buffer space limit is set, the number of
+  /// errors which fit into the buffer varies depending on error conditions,
+  /// write operation types (insert/update/delete), and write operation
+  /// row sizes.
+  ///
+  /// When the limit is set, the session will drop the first error that would
+  /// overflow the buffer as well as all subsequent errors. To resume the
+  /// accumulation of session errors, it's necessary to flush the current
+  /// contents of the error buffer using the GetPendingErrors() method.
+  ///
+  /// @param [in] size_bytes
+  ///   Limit on the maximum memory size consumed by collected session errors,
+  ///   where @c 0 means 'unlimited'.
+  /// @return Operation result status. An error is returned on an attempt
+  ///   to set the limit on the buffer space if:
+  ///   @li the session has already dropped at least one error since the last
+  ///     call to the GetPendingErrors() method
+  ///   @li the new limit is less than the amount of space occupied by already
+  ///     accumulated errors.
+  Status SetErrorBufferSpace(size_t size_bytes);
+
   /// Get error count for pending operations.
   ///
   /// Errors may accumulate in session's lifetime; use this method to
   /// see how many errors happened since last call of GetPendingErrors() method.
+  /// The error count includes both the accumulated and dropped errors. An error
+  /// might be dropped due to the limit on the error buffer size; see the
+  /// SetErrorBufferSpace() method for details.
   ///
   /// @return Total count of errors accumulated during the session.
   int CountPendingErrors() const;
@@ -1531,7 +1562,7 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
   ///   ownership of the returned errors in the container.
   /// @param [out] overflowed
   ///   If there were more errors than could be held in the session's error
-  ///   storage, then @c overflowed is set to @c true.
+  ///   buffer, then @c overflowed is set to @c true.
   void GetPendingErrors(std::vector<KuduError*>* errors, bool* overflowed);
 
   /// @return Client for the session: pointer to the associated client object.

http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/error-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/error-internal.cc b/src/kudu/client/error-internal.cc
index 78a62c9..efb673e 100644
--- a/src/kudu/client/error-internal.cc
+++ b/src/kudu/client/error-internal.cc
@@ -27,8 +27,5 @@ KuduError::Data::Data(gscoped_ptr<KuduWriteOperation> failed_op,
   status_(status) {
 }
 
-KuduError::Data::~Data() {
-}
-
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/error-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/error-internal.h b/src/kudu/client/error-internal.h
index 78ccad4..cfbd2fe 100644
--- a/src/kudu/client/error-internal.h
+++ b/src/kudu/client/error-internal.h
@@ -27,7 +27,7 @@ namespace client {
 class KuduError::Data {
  public:
   Data(gscoped_ptr<KuduWriteOperation> failed_op, const Status& error);
-  ~Data();
+  ~Data() = default;
 
   gscoped_ptr<KuduWriteOperation> failed_op_;
   Status status_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/error_collector.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/error_collector.cc b/src/kudu/client/error_collector.cc
index 02206cc..db89601 100644
--- a/src/kudu/client/error_collector.cc
+++ b/src/kudu/client/error_collector.cc
@@ -15,41 +15,95 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "kudu/client/client.h"
 #include "kudu/client/error_collector.h"
 
 #include <mutex>
 #include <vector>
 
+#include "kudu/client/client.h"
+#include "kudu/client/error-internal.h"
 #include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+
+using strings::Substitute;
 
 namespace kudu {
 namespace client {
 namespace internal {
 
+ErrorCollector::ErrorCollector()
+    : max_mem_size_bytes_(kMemSizeNoLimit),
+      mem_size_bytes_(0),
+      dropped_errors_cnt_(0) {
+}
+
 ErrorCollector::~ErrorCollector() {
   STLDeleteElements(&errors_);
 }
 
+Status ErrorCollector::SetMaxMemSize(size_t size_bytes) {
+  std::lock_guard<simple_spinlock> l(lock_);
+  if (dropped_errors_cnt_ > 0) {
+    // The error collector has dropped some errors already: do not allow
+    // to change the limit on memory size in this case at all. We want
+    // to preserve consistency in the set of accumulated errors -- do not
+    // allow a situation when the accumulated errors contain 'holes'.
+    return Status::IllegalState(
+        "cannot set new limit: already dropped some errors");
+  }
+  if (size_bytes != kMemSizeNoLimit && size_bytes < mem_size_bytes_) {
+    // Do not overlow the error collector post-factum.
+    return Status::IllegalState(
+        Substitute("new limit of $0 bytes: already accumulated errors "
+                   "for $1 bytes", size_bytes, mem_size_bytes_));
+  }
+  max_mem_size_bytes_ = size_bytes;
+
+  return Status::OK();
+}
+
 void ErrorCollector::AddError(gscoped_ptr<KuduError> error) {
   std::lock_guard<simple_spinlock> l(lock_);
+  const size_t error_size_bytes = error->data_->failed_op_->SizeInBuffer();
+
+  // If the maximum limit on the memory size is set, check whether the
+  // total-size-to-be would be greater than the specified limit after adding
+  // a new item into the collection: if yes, then drop this and all subsequent
+  // errors.
+  //
+  // Once an error has been dropped, drop all the subsequent ones even if they
+  // would fit into the available memory -- this is to preserve consistent
+  // sequencing of the accumulated errors.
+  if (max_mem_size_bytes_ != kMemSizeNoLimit &&
+      (dropped_errors_cnt_ > 0 ||
+       error_size_bytes + mem_size_bytes_ > max_mem_size_bytes_)) {
+    ++dropped_errors_cnt_;
+    return;
+  }
+  mem_size_bytes_ += error_size_bytes;
   errors_.push_back(error.release());
 }
 
 size_t ErrorCollector::CountErrors() const {
   std::lock_guard<simple_spinlock> l(lock_);
-  return errors_.size();
+  return errors_.size() + dropped_errors_cnt_;
 }
 
-void ErrorCollector::GetErrors(std::vector<KuduError*>* errors, bool* overflowed) {
+void ErrorCollector::GetErrors(std::vector<KuduError*>* errors,
+                               bool* overflowed) {
   std::lock_guard<simple_spinlock> l(lock_);
+  if (overflowed != nullptr) {
+    *overflowed = (dropped_errors_cnt_ != 0);
+  }
   if (errors != nullptr) {
     errors->clear();
     errors->swap(errors_);
+  } else {
+    STLDeleteElements(&errors_);
   }
-  if (overflowed != nullptr) {
-    *overflowed = false;
-  }
+  dropped_errors_cnt_ = 0;
+  mem_size_bytes_ = 0;
 }
 
 } // namespace internal

http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/error_collector.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/error_collector.h b/src/kudu/client/error_collector.h
index 8d4d393..33e70c3 100644
--- a/src/kudu/client/error_collector.h
+++ b/src/kudu/client/error_collector.h
@@ -28,6 +28,7 @@
 namespace kudu {
 namespace client {
 
+class ClientUnitTest_TestErrorCollector_Test;
 class KuduError;
 class KuduInsert;
 
@@ -35,7 +36,12 @@ namespace internal {
 
 class ErrorCollector : public RefCountedThreadSafe<ErrorCollector> {
  public:
-  ErrorCollector() = default;
+  static const size_t kMemSizeNoLimit = 0;
+
+  ErrorCollector();
+
+  // See KuduSession::SetErrorBufferSpace() for details.
+  Status SetMaxMemSize(size_t size_bytes);
 
   virtual void AddError(gscoped_ptr<KuduError> error);
 
@@ -49,10 +55,14 @@ class ErrorCollector : public RefCountedThreadSafe<ErrorCollector> {
   virtual ~ErrorCollector();
 
  private:
+  friend class ::kudu::client::ClientUnitTest_TestErrorCollector_Test;
   friend class RefCountedThreadSafe<ErrorCollector>;
 
   mutable simple_spinlock lock_;
   std::vector<KuduError*> errors_;
+  size_t max_mem_size_bytes_;
+  size_t mem_size_bytes_;
+  size_t dropped_errors_cnt_;
 
   DISALLOW_COPY_AND_ASSIGN(ErrorCollector);
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/write_op.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/write_op.h b/src/kudu/client/write_op.h
index 16fe5af..6513eb7 100644
--- a/src/kudu/client/write_op.h
+++ b/src/kudu/client/write_op.h
@@ -31,6 +31,7 @@ namespace client {
 
 namespace internal {
 class Batcher;
+class ErrorCollector;
 class WriteRpc;
 } // namespace internal
 
@@ -100,6 +101,7 @@ class KUDU_EXPORT KuduWriteOperation {
  private:
   friend class internal::Batcher;
   friend class internal::WriteRpc;
+  friend class internal::ErrorCollector;
 
   // Create and encode the key for this write (key must be set)
   //