You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/01/18 02:58:13 UTC
[kudu] branch master updated: [row_operations] minor updates
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new f3f919293 [row_operations] minor updates
f3f919293 is described below
commit f3f919293647c6e10b711c5c5d88ee8bd041f078
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sat Nov 26 11:16:05 2022 -0800
[row_operations] minor updates
This patch updates the code in row_operations.cc
* simplified handling of RowOperationsPB::Type in DecodeOp
* use std::make_shared where appropriate
* use move semantics in a few places
* unsorted style-related changes
Change-Id: Ib1d7c6c28078447638a246a08f137b374cc9dac7
Reviewed-on: http://gerrit.cloudera.org:8080/19424
Tested-by: Alexey Serbin <al...@apache.org>
Reviewed-by: Yifan Zhang <ch...@163.com>
---
src/kudu/common/row_operations.cc | 99 +++++++++++-------------
src/kudu/tablet/tablet_auto_incrementing-test.cc | 4 +-
src/kudu/tserver/tablet_server-test.cc | 49 ++++++++----
3 files changed, 78 insertions(+), 74 deletions(-)
diff --git a/src/kudu/common/row_operations.cc b/src/kudu/common/row_operations.cc
index 43970b789..2c0991502 100644
--- a/src/kudu/common/row_operations.cc
+++ b/src/kudu/common/row_operations.cc
@@ -179,8 +179,8 @@ size_t RowOperationsPBEncoder::Add(RowOperationsPB::Type op_type,
}
void RowOperationsPBEncoder::RemoveLast() {
- CHECK_NE(string::npos, prev_indirect_data_size_);
- CHECK_NE(string::npos, prev_rows_size_);
+ DCHECK_NE(string::npos, prev_indirect_data_size_);
+ DCHECK_NE(string::npos, prev_rows_size_);
pb_->mutable_indirect_data()->resize(prev_indirect_data_size_);
pb_->mutable_rows()->resize(prev_rows_size_);
prev_indirect_data_size_ = string::npos;
@@ -231,7 +231,7 @@ Status RowOperationsPBDecoder::ReadOpType(RowOperationsPB::Type* type) {
return Status::Corruption("Cannot find operation type");
}
if (PREDICT_FALSE(!RowOperationsPB_Type_IsValid(src_[0]))) {
- return Status::Corruption(Substitute("Unknown operation type: $0", src_[0]));
+ return Status::NotSupported(Substitute("Unknown operation type: $0", src_[0]));
}
*type = static_cast<RowOperationsPB::Type>(src_[0]);
src_.remove_prefix(1);
@@ -309,8 +309,7 @@ Status RowOperationsPBDecoder::ReadColumn(const ColumnSchema& col,
Status RowOperationsPBDecoder::ReadColumnAndDiscard(const ColumnSchema& col) {
uint8_t scratch[kLargestTypeSize];
- RETURN_NOT_OK(ReadColumn(col, scratch, nullptr));
- return Status::OK();
+ return ReadColumn(col, scratch, nullptr);
}
bool RowOperationsPBDecoder::HasNext() const {
@@ -348,7 +347,7 @@ class ClientServerMapping {
const Schema* tablet_schema)
: client_schema_(client_schema),
tablet_schema_(tablet_schema),
- saw_tablet_col_(tablet_schema->num_columns()) {
+ saw_tablet_col_(tablet_schema->num_columns(), false) {
}
Status ProjectBaseColumn(size_t client_col_idx, size_t tablet_col_idx) {
@@ -433,9 +432,9 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
}
// Allocate a row with the tablet's layout.
- auto tablet_row_storage = reinterpret_cast<uint8_t*>(
+ auto* tablet_row_storage = reinterpret_cast<uint8_t*>(
dst_arena_->AllocateBytesAligned(tablet_row_size_, 8));
- auto tablet_isset_bitmap = reinterpret_cast<uint8_t*>(
+ auto* tablet_isset_bitmap = reinterpret_cast<uint8_t*>(
dst_arena_->AllocateBytes(BitmapSize(tablet_schema_->num_columns())));
if (PREDICT_FALSE(!tablet_row_storage || !tablet_isset_bitmap)) {
return Status::RuntimeError("Out of memory");
@@ -451,7 +450,6 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
// Now handle each of the columns passed by the user, replacing the defaults
// from the prototype.
- Status row_status;
const auto auto_incrementing_col_idx = tablet_schema_->auto_incrementing_col_idx();
for (size_t client_col_idx = 0;
client_col_idx < client_schema_->num_columns();
@@ -467,17 +465,20 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
BitmapChange(tablet_isset_bitmap, tablet_col_idx, isset);
if (isset) {
// If the client provided a value for this column, copy it.
-
// Copy null-ness, if the server side column is nullable.
- bool client_set_to_null = client_schema_->has_nullables() &&
- BitmapTest(client_null_map, client_col_idx);
+ const bool client_set_to_null = client_schema_->has_nullables() &&
+ BitmapTest(client_null_map, client_col_idx);
if (col.is_nullable()) {
tablet_row.set_null(tablet_col_idx, client_set_to_null);
}
if (!client_set_to_null) {
// Copy the value if it's not null.
- RETURN_NOT_OK(ReadColumn(col, tablet_row.mutable_cell_ptr(tablet_col_idx), &row_status));
- if (PREDICT_FALSE(!row_status.ok())) op->SetFailureStatusOnce(row_status);
+ Status row_status;
+ RETURN_NOT_OK(ReadColumn(
+ col, tablet_row.mutable_cell_ptr(tablet_col_idx), &row_status));
+ if (PREDICT_FALSE(!row_status.ok())) {
+ op->SetFailureStatusOnce(row_status);
+ }
} else if (PREDICT_FALSE(!col.is_nullable())) {
op->SetFailureStatusOnce(Status::InvalidArgument(
"NULL values not allowed for non-nullable column", col.ToString()));
@@ -521,8 +522,6 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& mapping,
DecodedRowOperation* op) {
- size_t rowkey_size = tablet_schema_->key_byte_size();
-
const uint8_t* client_isset_map = nullptr;
const uint8_t* client_null_map = nullptr;
@@ -533,8 +532,8 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
}
// Allocate space for the row key.
- auto rowkey_storage = reinterpret_cast<uint8_t*>(
- dst_arena_->AllocateBytesAligned(rowkey_size, 8));
+ auto* rowkey_storage = reinterpret_cast<uint8_t*>(
+ dst_arena_->AllocateBytesAligned(tablet_schema_->key_byte_size(), 8));
if (PREDICT_FALSE(!rowkey_storage)) {
return Status::RuntimeError("Out of memory");
}
@@ -564,7 +563,7 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
}
bool client_set_to_null = client_schema_->has_nullables() &&
- BitmapTest(client_null_map, client_col_idx);
+ BitmapTest(client_null_map, client_col_idx);
if (PREDICT_FALSE(client_set_to_null)) {
op->SetFailureStatusOnce(Status::InvalidArgument("NULL values not allowed for key column",
col.ToString()));
@@ -614,7 +613,9 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
uint8_t* val_to_add = nullptr;
if (!client_set_to_null) {
RETURN_NOT_OK(ReadColumn(col, scratch, &row_status));
- if (PREDICT_FALSE(!row_status.ok())) op->SetFailureStatusOnce(row_status);
+ if (PREDICT_FALSE(!row_status.ok())) {
+ op->SetFailureStatusOnce(row_status);
+ }
val_to_add = scratch;
} else if (PREDICT_FALSE(!col.is_nullable())) {
op->SetFailureStatusOnce(Status::InvalidArgument(
@@ -635,8 +636,8 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
if (PREDICT_TRUE(op->result.ok())) {
// Copy the row-changelist to the arena.
- auto rcl_in_arena = reinterpret_cast<uint8_t*>(
- dst_arena_->AllocateBytesAligned(buf.size(), 8));
+ auto* rcl_in_arena = reinterpret_cast<uint8_t*>(
+ dst_arena_->AllocateBytesAligned(buf.size(), 8));
if (PREDICT_FALSE(rcl_in_arena == nullptr)) {
return Status::RuntimeError("Out of memory allocating RCL");
}
@@ -653,7 +654,7 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
"DELETE should not have a value for column", col.ToString()));
bool client_set_to_null = client_schema_->has_nullables() &&
- BitmapTest(client_null_map, client_col_idx);
+ BitmapTest(client_null_map, client_col_idx);
if (!client_set_to_null || !col.is_nullable()) {
RETURN_NOT_OK(ReadColumnAndDiscard(col));
}
@@ -671,7 +672,7 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
Status RowOperationsPBDecoder::DecodeSplitRow(const ClientServerMapping& mapping,
DecodedRowOperation* op) {
- op->split_row.reset(new KuduPartialRow(tablet_schema_));
+ op->split_row = std::make_shared<KuduPartialRow>(tablet_schema_);
const uint8_t* client_isset_map;
const uint8_t* client_null_map;
@@ -696,12 +697,9 @@ Status RowOperationsPBDecoder::DecodeSplitRow(const ClientServerMapping& mapping
// If the client provided a value for this column, copy it.
Slice column_slice;
RETURN_NOT_OK(GetColumnSlice(col, &column_slice, nullptr));
- const uint8_t* data;
- if (col.type_info()->physical_type() == BINARY) {
- data = reinterpret_cast<const uint8_t*>(&column_slice);
- } else {
- data = column_slice.data();
- }
+ const uint8_t* data = (col.type_info()->physical_type() == BINARY)
+ ? reinterpret_cast<const uint8_t*>(&column_slice)
+ : column_slice.data();
RETURN_NOT_OK(op->split_row->Set(static_cast<int32_t>(tablet_col_idx), data));
}
}
@@ -737,12 +735,12 @@ Status RowOperationsPBDecoder::DecodeOperations(vector<DecodedRowOperation>* ops
while (HasNext()) {
RowOperationsPB::Type type = RowOperationsPB::UNKNOWN;
RETURN_NOT_OK(ReadOpType(&type));
+
DecodedRowOperation op;
op.type = type;
-
RETURN_NOT_OK(DecodeOp<mode>(type, prototype_row_storage, mapping, &op,
auto_incrementing_counter));
- ops->push_back(op);
+ ops->emplace_back(std::move(op));
}
return Status::OK();
@@ -754,34 +752,27 @@ Status RowOperationsPBDecoder::DecodeOp<DecoderMode::WRITE_OPS>(
const ClientServerMapping& mapping, DecodedRowOperation* op,
int64_t* auto_incrementing_counter) {
switch (type) {
- case RowOperationsPB::UNKNOWN:
- return Status::NotSupported("Unknown row operation type");
case RowOperationsPB::UPSERT:
- if (tablet_schema_->has_auto_incrementing()) {
- return Status::NotSupported("Tables with auto-incrementing column do not support "
- "UPSERT operations");
- }
case RowOperationsPB::UPSERT_IGNORE:
if (tablet_schema_->has_auto_incrementing()) {
- return Status::NotSupported("Tables with auto-incrementing column do not support "
- "UPSERT_IGNORE operations");
+ return Status::NotSupported(
+ Substitute("tables with auto-incrementing column do not support "
+ "$0 operations", RowOperationsPB_Type_Name(type)));
}
case RowOperationsPB::INSERT:
case RowOperationsPB::INSERT_IGNORE:
- RETURN_NOT_OK(DecodeInsertOrUpsert(prototype_row_storage, mapping, op,
- auto_incrementing_counter));
- break;
+ return DecodeInsertOrUpsert(prototype_row_storage, mapping, op,
+ auto_incrementing_counter);
case RowOperationsPB::UPDATE:
case RowOperationsPB::UPDATE_IGNORE:
case RowOperationsPB::DELETE:
case RowOperationsPB::DELETE_IGNORE:
- RETURN_NOT_OK(DecodeUpdateOrDelete(mapping, op));
- break;
+ return DecodeUpdateOrDelete(mapping, op);
default:
- return Status::InvalidArgument(Substitute("Invalid write operation type $0",
- RowOperationsPB_Type_Name(type)));
+ break;
}
- return Status::OK();
+ return Status::InvalidArgument(Substitute("Invalid write operation type $0",
+ RowOperationsPB_Type_Name(type)));
}
template<>
@@ -790,25 +781,23 @@ Status RowOperationsPBDecoder::DecodeOp<DecoderMode::SPLIT_ROWS>(
const ClientServerMapping& mapping, DecodedRowOperation* op,
int64_t* /*auto_incrementing_counter*/) {
switch (type) {
- case RowOperationsPB::UNKNOWN:
- return Status::NotSupported("Unknown row operation type");
case RowOperationsPB::SPLIT_ROW:
case RowOperationsPB::RANGE_LOWER_BOUND:
case RowOperationsPB::RANGE_UPPER_BOUND:
case RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND:
case RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND:
- RETURN_NOT_OK(DecodeSplitRow(mapping, op));
- break;
+ return DecodeSplitRow(mapping, op);
default:
- return Status::InvalidArgument(Substitute("Invalid split row type $0",
- RowOperationsPB_Type_Name(type)));
+ break;
}
- return Status::OK();
+ return Status::InvalidArgument(Substitute("Invalid split row type $0",
+ RowOperationsPB_Type_Name(type)));
}
template
Status RowOperationsPBDecoder::DecodeOperations<DecoderMode::SPLIT_ROWS>(
vector<DecodedRowOperation>* ops, int64_t* auto_incrementing_counter);
+
template
Status RowOperationsPBDecoder::DecodeOperations<DecoderMode::WRITE_OPS>(
vector<DecodedRowOperation>* ops, int64_t* auto_incrementing_counter);
diff --git a/src/kudu/tablet/tablet_auto_incrementing-test.cc b/src/kudu/tablet/tablet_auto_incrementing-test.cc
index 0ea250ed5..1650f7195 100644
--- a/src/kudu/tablet/tablet_auto_incrementing-test.cc
+++ b/src/kudu/tablet/tablet_auto_incrementing-test.cc
@@ -99,7 +99,7 @@ TEST_F(AutoIncrementingTabletTest, TestUpsertOp) {
ASSERT_OK(row->SetInt32(1, 1337));
Status s = writer_->Upsert(*row);
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
- ASSERT_EQ("Not implemented: Tables with auto-incrementing "
+ ASSERT_EQ("Not implemented: tables with auto-incrementing "
"column do not support UPSERT operations", s.ToString());
}
@@ -110,7 +110,7 @@ TEST_F(AutoIncrementingTabletTest, TestUpsertIgnoreOp) {
ASSERT_OK(row->SetInt32(1, 1337));
Status s = writer_->UpsertIgnore(*row);
ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
- ASSERT_EQ("Not implemented: Tables with auto-incrementing "
+ ASSERT_EQ("Not implemented: tables with auto-incrementing "
"column do not support UPSERT_IGNORE operations", s.ToString());
}
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 608c7ecfa..9d2787505 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -1686,13 +1686,19 @@ TEST_F(TabletServerTest, TestInsertAndMutate) {
// Try sending write requests that do not contain write operations. Make sure
// we get an error that makes sense.
TEST_F(TabletServerTest, TestInvalidWriteRequest_WrongOpType) {
- const vector<RowOperationsPB::Type> wrong_op_types = {
+ const RowOperationsPB::Type wrong_op_types[] = {
RowOperationsPB::SPLIT_ROW,
RowOperationsPB::RANGE_LOWER_BOUND,
RowOperationsPB::RANGE_UPPER_BOUND,
RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND,
RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND,
};
+ const RowOperationsPB::Type unknown_op_types[] = {
+ RowOperationsPB::UNKNOWN,
+ };
+ const RowOperationsPB::Type unsupported_op_types[] = {
+ static_cast<RowOperationsPB::Type>(RowOperationsPB_Type_Type_MAX + 1),
+ };
const auto send_bad_write = [&] (RowOperationsPB::Type op_type) {
WriteRequestPB req;
req.set_tablet_id(kTabletId);
@@ -1706,26 +1712,35 @@ TEST_F(TabletServerTest, TestInvalidWriteRequest_WrongOpType) {
CHECK_OK(proxy_->Write(req, &resp, &controller));
return resp;
};
- // Send a bunch of op types that are inappropriate for write requests.
- for (const auto& op_type : wrong_op_types) {
+ const auto check_op_type = [&] (RowOperationsPB::Type op_type,
+ AppStatusPB::ErrorCode expected_code,
+ const string& expected_msg_substr) {
WriteResponsePB resp = send_bad_write(op_type);
SCOPED_TRACE(SecureDebugString(resp));
ASSERT_TRUE(resp.has_error());
ASSERT_EQ(TabletServerErrorPB::MISMATCHED_SCHEMA, resp.error().code());
- ASSERT_EQ(AppStatusPB::INVALID_ARGUMENT, resp.error().status().code());
- ASSERT_STR_CONTAINS(resp.error().status().message(),
- "Invalid write operation type");
- }
- {
- // Do the same for UNKNOWN, which is an unexpected operation type in all
- // cases, and thus results in a different error message.
- WriteResponsePB resp = send_bad_write(RowOperationsPB::UNKNOWN);
- SCOPED_TRACE(SecureDebugString(resp));
- ASSERT_TRUE(resp.has_error());
- ASSERT_EQ(TabletServerErrorPB::MISMATCHED_SCHEMA, resp.error().code());
- ASSERT_EQ(AppStatusPB::NOT_SUPPORTED, resp.error().status().code());
- ASSERT_STR_CONTAINS(resp.error().status().message(),
- "Unknown row operation type");
+ ASSERT_EQ(expected_code, resp.error().status().code());
+ ASSERT_STR_CONTAINS(resp.error().status().message(), expected_msg_substr);
+ };
+
+ // Send a bunch of op types that are inappropriate for write requests.
+ for (const auto& op_type : wrong_op_types) {
+ NO_FATALS(check_op_type(op_type,
+ AppStatusPB::INVALID_ARGUMENT,
+ "Invalid write operation type"));
+ }
+ // Do the same for UNKNOWN.
+ for (const auto& op_type : unknown_op_types) {
+ NO_FATALS(check_op_type(op_type,
+ AppStatusPB::INVALID_ARGUMENT,
+ "Invalid write operation type"));
+ }
+ // Try one more value which isn't among the defined ones in the
+ // RowOperationsPB::Type enumeration.
+ for (const auto& op_type : unsupported_op_types) {
+ NO_FATALS(check_op_type(op_type,
+ AppStatusPB::NOT_SUPPORTED,
+ "Unknown operation type"));
}
}