You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2016/12/02 21:42:48 UTC
[1/2] kudu git commit: [catalog_manager] removed unused variable
Repository: kudu
Updated Branches:
refs/heads/master b15c0f6e3 -> 60aa54e21
[catalog_manager] removed unused variable
This patch does not contain any functional changes.
Change-Id: I6dfe92e06b6d4381dab4190615293aa47386fc6d
Reviewed-on: http://gerrit.cloudera.org:8080/5331
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/7cb65591
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/7cb65591
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/7cb65591
Branch: refs/heads/master
Commit: 7cb65591e18dd6394272d8cca4d653b49d6dd4b9
Parents: b15c0f6
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Nov 17 23:16:01 2016 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Dec 2 21:37:17 2016 +0000
----------------------------------------------------------------------
src/kudu/master/catalog_manager.cc | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/7cb65591/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 0f3dff3..3df0d46 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -42,9 +42,7 @@
#include "kudu/master/catalog_manager.h"
#include <algorithm>
-#include <boost/optional.hpp>
#include <condition_variable>
-#include <glog/logging.h>
#include <memory>
#include <mutex>
#include <set>
@@ -52,6 +50,9 @@
#include <utility>
#include <vector>
+#include <boost/optional.hpp>
+#include <glog/logging.h>
+
#include "kudu/cfile/type_encodings.h"
#include "kudu/common/key_util.h"
#include "kudu/common/partial_row.h"
@@ -3561,7 +3562,6 @@ Status CatalogManager::GetTableLocations(const GetTableLocationsRequestPB* req,
vector<scoped_refptr<TabletInfo> > tablets_in_range;
table->GetTabletsInRange(req, &tablets_in_range);
- ServerRegistrationPB reg;
for (const scoped_refptr<TabletInfo>& tablet : tablets_in_range) {
Status s = BuildLocationsForTablet(tablet, resp->add_tablet_locations());
if (s.ok()) {
[2/2] kudu git commit: KUDU-1752 C++ client error memory should be
bounded
Posted by ad...@apache.org.
KUDU-1752 C++ client error memory should be bounded
Added KuduSession::SetErrorBufferSpace() method to set limit on maximum
size of memory consumed by session errors. To preserve the legacy
behavior, a session is created with no limit on error memory size.
Change-Id: I0b9fe3e83bab2a0b703b685ec7f3bb1db1601e5f
Reviewed-on: http://gerrit.cloudera.org:8080/5308
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/60aa54e2
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/60aa54e2
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/60aa54e2
Branch: refs/heads/master
Commit: 60aa54e21b715a4e7a1931570c9736a2c7f211c6
Parents: 7cb6559
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Dec 1 10:16:51 2016 -0800
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Dec 2 21:42:08 2016 +0000
----------------------------------------------------------------------
src/kudu/client/client-test.cc | 55 +++++++++++++++++++++++++++
src/kudu/client/client-unittest.cc | 43 ++++++++++++++++++++-
src/kudu/client/client.cc | 4 ++
src/kudu/client/client.h | 33 ++++++++++++++++-
src/kudu/client/error-internal.cc | 3 --
src/kudu/client/error-internal.h | 2 +-
src/kudu/client/error_collector.cc | 66 ++++++++++++++++++++++++++++++---
src/kudu/client/error_collector.h | 12 +++++-
src/kudu/client/write_op.h | 2 +
9 files changed, 206 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 7827912..4e7aaa5 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -4332,5 +4332,60 @@ TEST_F(ClientTest, TestGetTablet) {
}
}
+TEST_F(ClientTest, TestErrorCollector) {
+ shared_ptr<KuduSession> session(client_->NewSession());
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+
+ NO_FATALS(InsertTestRows(client_table_.get(), session.get(), 1, 0));
+ ASSERT_OK(session->Flush());
+
+ // Set the maximum size limit too low even for a single error
+ // and make sure the error collector reports overflow and drops the error.
+ {
+ ASSERT_OK(session->SetErrorBufferSpace(1));
+ // Trying to insert a duplicate row.
+ NO_FATALS(InsertTestRows(client_table_.get(), session.get(), 1, 0));
+ // Expecting an error since tried to insert a duplicate key.
+ ASSERT_TRUE((session->Flush()).IsIOError());
+
+ // It's impossible to update the error buffer size because
+ // the error collector's buffer is overflown and one error has been dropped.
+ EXPECT_TRUE(session->SetErrorBufferSpace(0).IsIllegalState());
+
+ vector<KuduError*> errors;
+ ElementDeleter drop(&errors);
+ bool overflowed;
+ session->GetPendingErrors(&errors, &overflowed);
+ EXPECT_TRUE(errors.empty());
+ EXPECT_TRUE(overflowed);
+ }
+
+ // After calling the GetPendingErrors() and retrieving the errors, it's
+ // possible to update the limit on the error buffer size. Besides, the error
+ // collector should be able to accomodate the duplicate row error
+ // if the error fits the buffer.
+ {
+ ASSERT_OK(session->SetErrorBufferSpace(1024));
+ NO_FATALS(InsertTestRows(client_table_.get(), session.get(), 1, 0));
+ // Expecting an error: tried to insert a duplicate key.
+ ASSERT_TRUE((session->Flush()).IsIOError());
+
+ // It's impossible to update the error buffer size if the error collector
+ // would become overflown.
+ EXPECT_TRUE(session->SetErrorBufferSpace(1).IsIllegalState());
+
+ // It's OK to update the error buffer size because the new limit is high
+ // enough and the error collector hasn't dropped a single error yet.
+ EXPECT_OK(session->SetErrorBufferSpace(2048));
+
+ vector<KuduError*> errors;
+ ElementDeleter drop(&errors);
+ bool overflowed;
+ session->GetPendingErrors(&errors, &overflowed);
+ EXPECT_EQ(1, errors.size());
+ EXPECT_FALSE(overflowed);
+ }
+}
+
} // namespace client
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/client-unittest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-unittest.cc b/src/kudu/client/client-unittest.cc
index 03e87b8..403044c 100644
--- a/src/kudu/client/client-unittest.cc
+++ b/src/kudu/client/client-unittest.cc
@@ -17,16 +17,21 @@
//
// Tests for the client which are true unit tests and don't require a cluster, etc.
-#include <boost/bind.hpp>
-#include <gtest/gtest.h>
#include <string>
#include <vector>
+#include <boost/bind.hpp>
+#include <gtest/gtest.h>
+
#include "kudu/client/client.h"
#include "kudu/client/client-internal.h"
+#include "kudu/client/error_collector.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/test_macros.h"
using std::string;
using std::vector;
+using kudu::client::internal::ErrorCollector;
namespace kudu {
namespace client {
@@ -169,6 +174,40 @@ TEST(ClientUnitTest, TestRetryFunc) {
ASSERT_LT(counter, 20);
}
+TEST(ClientUnitTest, TestErrorCollector) {
+ {
+ scoped_refptr<ErrorCollector> ec(new ErrorCollector);
+ // Setting the max memory size limit to 'unlimited'.
+ EXPECT_OK(ec->SetMaxMemSize(0));
+ // Setting the max memory size to 1 byte.
+ EXPECT_OK(ec->SetMaxMemSize(1));
+ }
+
+ // Check that the error collector does not allow to set the memory size limit
+ // if at least one error has been dropped since last flush.
+ {
+ scoped_refptr<ErrorCollector> ec(new ErrorCollector);
+ ec->dropped_errors_cnt_ = 1;
+ Status s = ec->SetMaxMemSize(0);
+ EXPECT_TRUE(s.IsIllegalState());
+ ASSERT_STR_CONTAINS(s.ToString(),
+ "cannot set new limit: already dropped some errors");
+ }
+
+ // Check that the error collector does not overflow post-factum on call of the
+ // SetMaxMemSize() method.
+ {
+ const size_t size_bytes = 8;
+ scoped_refptr<ErrorCollector> ec(new ErrorCollector);
+ ec->mem_size_bytes_ = size_bytes;
+ EXPECT_OK(ec->SetMaxMemSize(0));
+ EXPECT_OK(ec->SetMaxMemSize(size_bytes));
+ Status s = ec->SetMaxMemSize(size_bytes - 1);
+ EXPECT_TRUE(s.IsIllegalState());
+ ASSERT_STR_CONTAINS(s.ToString(), "already accumulated errors for");
+ }
+}
+
} // namespace client
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/client.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 974b4be..b634548 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -859,6 +859,10 @@ int KuduSession::CountBufferedOperations() const {
return data_->CountBufferedOperations();
}
+Status KuduSession::SetErrorBufferSpace(size_t size_bytes) {
+ return data_->error_collector_->SetMaxMemSize(size_bytes);
+}
+
int KuduSession::CountPendingErrors() const {
return data_->error_collector_->CountErrors();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 506a19f..2472254 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -69,6 +69,7 @@ class KuduWriteOperation;
namespace internal {
class Batcher;
+class ErrorCollector;
class GetTableSchemaRpc;
class LookupRpc;
class MetaCache;
@@ -1123,6 +1124,7 @@ class KUDU_EXPORT KuduError {
class KUDU_NO_EXPORT Data;
friend class internal::Batcher;
+ friend class internal::ErrorCollector;
friend class KuduSession;
KuduError(KuduWriteOperation* failed_op, const Status& error);
@@ -1514,10 +1516,39 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
ATTRIBUTE_DEPRECATED("this method is experimental and will disappear "
"in a future release");
+ /// Set limit on maximum buffer (memory) size used by this session's errors.
+ /// By default, when a session is created, there is no limit on maximum size.
+ ///
+ /// The session's error buffer contains information on failed write
+ /// operations. In most cases, the error contains the row which would be
+ /// applied as is. If the error buffer space limit is set, the number of
+ /// errors which fit into the buffer varies depending on error conditions,
+ /// write operation types (insert/update/delete), and write operation
+ /// row sizes.
+ ///
+ /// When the limit is set, the session will drop the first error that would
+ /// overflow the buffer as well as all subsequent errors. To resume the
+ /// accumulation of session errors, it's necessary to flush the current
+ /// contents of the error buffer using the GetPendingErrors() method.
+ ///
+ /// @param [in] size_bytes
+ /// Limit on the maximum memory size consumed by collected session errors,
+ /// where @c 0 means 'unlimited'.
+ /// @return Operation result status. An error is returned on an attempt
+ /// to set the limit on the buffer space if:
+ /// @li the session has already dropped at least one error since the last
+ /// call to the GetPendingErrors() method
+ /// @li the new limit is less than the amount of space occupied by already
+ /// accumulated errors.
+ Status SetErrorBufferSpace(size_t size_bytes);
+
/// Get error count for pending operations.
///
/// Errors may accumulate in session's lifetime; use this method to
/// see how many errors happened since last call of GetPendingErrors() method.
+ /// The error count includes both the accumulated and dropped errors. An error
+ /// might be dropped due to the limit on the error buffer size; see the
+ /// SetErrorBufferSpace() method for details.
///
/// @return Total count of errors accumulated during the session.
int CountPendingErrors() const;
@@ -1531,7 +1562,7 @@ class KUDU_EXPORT KuduSession : public sp::enable_shared_from_this<KuduSession>
/// ownership of the returned errors in the container.
/// @param [out] overflowed
/// If there were more errors than could be held in the session's error
- /// storage, then @c overflowed is set to @c true.
+ /// buffer, then @c overflowed is set to @c true.
void GetPendingErrors(std::vector<KuduError*>* errors, bool* overflowed);
/// @return Client for the session: pointer to the associated client object.
http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/error-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/error-internal.cc b/src/kudu/client/error-internal.cc
index 78a62c9..efb673e 100644
--- a/src/kudu/client/error-internal.cc
+++ b/src/kudu/client/error-internal.cc
@@ -27,8 +27,5 @@ KuduError::Data::Data(gscoped_ptr<KuduWriteOperation> failed_op,
status_(status) {
}
-KuduError::Data::~Data() {
-}
-
} // namespace client
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/error-internal.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/error-internal.h b/src/kudu/client/error-internal.h
index 78ccad4..cfbd2fe 100644
--- a/src/kudu/client/error-internal.h
+++ b/src/kudu/client/error-internal.h
@@ -27,7 +27,7 @@ namespace client {
class KuduError::Data {
public:
Data(gscoped_ptr<KuduWriteOperation> failed_op, const Status& error);
- ~Data();
+ ~Data() = default;
gscoped_ptr<KuduWriteOperation> failed_op_;
Status status_;
http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/error_collector.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/error_collector.cc b/src/kudu/client/error_collector.cc
index 02206cc..db89601 100644
--- a/src/kudu/client/error_collector.cc
+++ b/src/kudu/client/error_collector.cc
@@ -15,41 +15,95 @@
// specific language governing permissions and limitations
// under the License.
-#include "kudu/client/client.h"
#include "kudu/client/error_collector.h"
#include <mutex>
#include <vector>
+#include "kudu/client/client.h"
+#include "kudu/client/error-internal.h"
#include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/status.h"
+
+using strings::Substitute;
namespace kudu {
namespace client {
namespace internal {
+ErrorCollector::ErrorCollector()
+ : max_mem_size_bytes_(kMemSizeNoLimit),
+ mem_size_bytes_(0),
+ dropped_errors_cnt_(0) {
+}
+
ErrorCollector::~ErrorCollector() {
STLDeleteElements(&errors_);
}
+Status ErrorCollector::SetMaxMemSize(size_t size_bytes) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ if (dropped_errors_cnt_ > 0) {
+ // The error collector has dropped some errors already: do not allow
+ // to change the limit on memory size in this case at all. We want
+ // to preserve consistency in the set of accumulated errors -- do not
+ // allow a situation when the accumulated errors contain 'holes'.
+ return Status::IllegalState(
+ "cannot set new limit: already dropped some errors");
+ }
+ if (size_bytes != kMemSizeNoLimit && size_bytes < mem_size_bytes_) {
+ // Do not overlow the error collector post-factum.
+ return Status::IllegalState(
+ Substitute("new limit of $0 bytes: already accumulated errors "
+ "for $1 bytes", size_bytes, mem_size_bytes_));
+ }
+ max_mem_size_bytes_ = size_bytes;
+
+ return Status::OK();
+}
+
void ErrorCollector::AddError(gscoped_ptr<KuduError> error) {
std::lock_guard<simple_spinlock> l(lock_);
+ const size_t error_size_bytes = error->data_->failed_op_->SizeInBuffer();
+
+ // If the maximum limit on the memory size is set, check whether the
+ // total-size-to-be would be greater than the specified limit after adding
+ // a new item into the collection: if yes, then drop this and all subsequent
+ // errors.
+ //
+ // Once an error has been dropped, drop all the subsequent ones even if they
+ // would fit into the available memory -- this is to preserve consistent
+ // sequencing of the accumulated errors.
+ if (max_mem_size_bytes_ != kMemSizeNoLimit &&
+ (dropped_errors_cnt_ > 0 ||
+ error_size_bytes + mem_size_bytes_ > max_mem_size_bytes_)) {
+ ++dropped_errors_cnt_;
+ return;
+ }
+ mem_size_bytes_ += error_size_bytes;
errors_.push_back(error.release());
}
size_t ErrorCollector::CountErrors() const {
std::lock_guard<simple_spinlock> l(lock_);
- return errors_.size();
+ return errors_.size() + dropped_errors_cnt_;
}
-void ErrorCollector::GetErrors(std::vector<KuduError*>* errors, bool* overflowed) {
+void ErrorCollector::GetErrors(std::vector<KuduError*>* errors,
+ bool* overflowed) {
std::lock_guard<simple_spinlock> l(lock_);
+ if (overflowed != nullptr) {
+ *overflowed = (dropped_errors_cnt_ != 0);
+ }
if (errors != nullptr) {
errors->clear();
errors->swap(errors_);
+ } else {
+ STLDeleteElements(&errors_);
}
- if (overflowed != nullptr) {
- *overflowed = false;
- }
+ dropped_errors_cnt_ = 0;
+ mem_size_bytes_ = 0;
}
} // namespace internal
http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/error_collector.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/error_collector.h b/src/kudu/client/error_collector.h
index 8d4d393..33e70c3 100644
--- a/src/kudu/client/error_collector.h
+++ b/src/kudu/client/error_collector.h
@@ -28,6 +28,7 @@
namespace kudu {
namespace client {
+class ClientUnitTest_TestErrorCollector_Test;
class KuduError;
class KuduInsert;
@@ -35,7 +36,12 @@ namespace internal {
class ErrorCollector : public RefCountedThreadSafe<ErrorCollector> {
public:
- ErrorCollector() = default;
+ static const size_t kMemSizeNoLimit = 0;
+
+ ErrorCollector();
+
+ // See KuduSession::SetErrorBufferSpace() for details.
+ Status SetMaxMemSize(size_t size_bytes);
virtual void AddError(gscoped_ptr<KuduError> error);
@@ -49,10 +55,14 @@ class ErrorCollector : public RefCountedThreadSafe<ErrorCollector> {
virtual ~ErrorCollector();
private:
+ friend class ::kudu::client::ClientUnitTest_TestErrorCollector_Test;
friend class RefCountedThreadSafe<ErrorCollector>;
mutable simple_spinlock lock_;
std::vector<KuduError*> errors_;
+ size_t max_mem_size_bytes_;
+ size_t mem_size_bytes_;
+ size_t dropped_errors_cnt_;
DISALLOW_COPY_AND_ASSIGN(ErrorCollector);
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/60aa54e2/src/kudu/client/write_op.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/write_op.h b/src/kudu/client/write_op.h
index 16fe5af..6513eb7 100644
--- a/src/kudu/client/write_op.h
+++ b/src/kudu/client/write_op.h
@@ -31,6 +31,7 @@ namespace client {
namespace internal {
class Batcher;
+class ErrorCollector;
class WriteRpc;
} // namespace internal
@@ -100,6 +101,7 @@ class KUDU_EXPORT KuduWriteOperation {
private:
friend class internal::Batcher;
friend class internal::WriteRpc;
+ friend class internal::ErrorCollector;
// Create and encode the key for this write (key must be set)
//