You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/01/18 02:58:13 UTC

[kudu] branch master updated: [row_operations] minor updates

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new f3f919293 [row_operations] minor updates
f3f919293 is described below

commit f3f919293647c6e10b711c5c5d88ee8bd041f078
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Sat Nov 26 11:16:05 2022 -0800

    [row_operations] minor updates
    
    This patch updates the code in row_operations.cc
      * simplified handling of RowOperationsPB::Type in DecodeOp
      * use std::make_shared where appropriate
      * use move semantics in a few places
      * unsorted style-related changes
    
    Change-Id: Ib1d7c6c28078447638a246a08f137b374cc9dac7
    Reviewed-on: http://gerrit.cloudera.org:8080/19424
    Tested-by: Alexey Serbin <al...@apache.org>
    Reviewed-by: Yifan Zhang <ch...@163.com>
---
 src/kudu/common/row_operations.cc                | 99 +++++++++++-------------
 src/kudu/tablet/tablet_auto_incrementing-test.cc |  4 +-
 src/kudu/tserver/tablet_server-test.cc           | 49 ++++++++----
 3 files changed, 78 insertions(+), 74 deletions(-)

diff --git a/src/kudu/common/row_operations.cc b/src/kudu/common/row_operations.cc
index 43970b789..2c0991502 100644
--- a/src/kudu/common/row_operations.cc
+++ b/src/kudu/common/row_operations.cc
@@ -179,8 +179,8 @@ size_t RowOperationsPBEncoder::Add(RowOperationsPB::Type op_type,
 }
 
 void RowOperationsPBEncoder::RemoveLast() {
-  CHECK_NE(string::npos, prev_indirect_data_size_);
-  CHECK_NE(string::npos, prev_rows_size_);
+  DCHECK_NE(string::npos, prev_indirect_data_size_);
+  DCHECK_NE(string::npos, prev_rows_size_);
   pb_->mutable_indirect_data()->resize(prev_indirect_data_size_);
   pb_->mutable_rows()->resize(prev_rows_size_);
   prev_indirect_data_size_ = string::npos;
@@ -231,7 +231,7 @@ Status RowOperationsPBDecoder::ReadOpType(RowOperationsPB::Type* type) {
     return Status::Corruption("Cannot find operation type");
   }
   if (PREDICT_FALSE(!RowOperationsPB_Type_IsValid(src_[0]))) {
-    return Status::Corruption(Substitute("Unknown operation type: $0", src_[0]));
+    return Status::NotSupported(Substitute("Unknown operation type: $0", src_[0]));
   }
   *type = static_cast<RowOperationsPB::Type>(src_[0]);
   src_.remove_prefix(1);
@@ -309,8 +309,7 @@ Status RowOperationsPBDecoder::ReadColumn(const ColumnSchema& col,
 
 Status RowOperationsPBDecoder::ReadColumnAndDiscard(const ColumnSchema& col) {
   uint8_t scratch[kLargestTypeSize];
-  RETURN_NOT_OK(ReadColumn(col, scratch, nullptr));
-  return Status::OK();
+  return ReadColumn(col, scratch, nullptr);
 }
 
 bool RowOperationsPBDecoder::HasNext() const {
@@ -348,7 +347,7 @@ class ClientServerMapping {
                       const Schema* tablet_schema)
     : client_schema_(client_schema),
       tablet_schema_(tablet_schema),
-      saw_tablet_col_(tablet_schema->num_columns()) {
+      saw_tablet_col_(tablet_schema->num_columns(), false) {
   }
 
   Status ProjectBaseColumn(size_t client_col_idx, size_t tablet_col_idx) {
@@ -433,9 +432,9 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
   }
 
   // Allocate a row with the tablet's layout.
-  auto tablet_row_storage = reinterpret_cast<uint8_t*>(
+  auto* tablet_row_storage = reinterpret_cast<uint8_t*>(
       dst_arena_->AllocateBytesAligned(tablet_row_size_, 8));
-  auto tablet_isset_bitmap = reinterpret_cast<uint8_t*>(
+  auto* tablet_isset_bitmap = reinterpret_cast<uint8_t*>(
       dst_arena_->AllocateBytes(BitmapSize(tablet_schema_->num_columns())));
   if (PREDICT_FALSE(!tablet_row_storage || !tablet_isset_bitmap)) {
     return Status::RuntimeError("Out of memory");
@@ -451,7 +450,6 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
 
   // Now handle each of the columns passed by the user, replacing the defaults
   // from the prototype.
-  Status row_status;
   const auto auto_incrementing_col_idx = tablet_schema_->auto_incrementing_col_idx();
   for (size_t client_col_idx = 0;
        client_col_idx < client_schema_->num_columns();
@@ -467,17 +465,20 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
     BitmapChange(tablet_isset_bitmap, tablet_col_idx, isset);
     if (isset) {
       // If the client provided a value for this column, copy it.
-
       // Copy null-ness, if the server side column is nullable.
-      bool client_set_to_null = client_schema_->has_nullables() &&
-        BitmapTest(client_null_map, client_col_idx);
+      const bool client_set_to_null = client_schema_->has_nullables() &&
+          BitmapTest(client_null_map, client_col_idx);
       if (col.is_nullable()) {
         tablet_row.set_null(tablet_col_idx, client_set_to_null);
       }
       if (!client_set_to_null) {
         // Copy the value if it's not null.
-        RETURN_NOT_OK(ReadColumn(col, tablet_row.mutable_cell_ptr(tablet_col_idx), &row_status));
-        if (PREDICT_FALSE(!row_status.ok())) op->SetFailureStatusOnce(row_status);
+        Status row_status;
+        RETURN_NOT_OK(ReadColumn(
+            col, tablet_row.mutable_cell_ptr(tablet_col_idx), &row_status));
+        if (PREDICT_FALSE(!row_status.ok())) {
+          op->SetFailureStatusOnce(row_status);
+        }
       } else if (PREDICT_FALSE(!col.is_nullable())) {
         op->SetFailureStatusOnce(Status::InvalidArgument(
             "NULL values not allowed for non-nullable column", col.ToString()));
@@ -521,8 +522,6 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
 
 Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& mapping,
                                                     DecodedRowOperation* op) {
-  size_t rowkey_size = tablet_schema_->key_byte_size();
-
   const uint8_t* client_isset_map = nullptr;
   const uint8_t* client_null_map = nullptr;
 
@@ -533,8 +532,8 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
   }
 
   // Allocate space for the row key.
-  auto rowkey_storage = reinterpret_cast<uint8_t*>(
-    dst_arena_->AllocateBytesAligned(rowkey_size, 8));
+  auto* rowkey_storage = reinterpret_cast<uint8_t*>(
+      dst_arena_->AllocateBytesAligned(tablet_schema_->key_byte_size(), 8));
   if (PREDICT_FALSE(!rowkey_storage)) {
     return Status::RuntimeError("Out of memory");
   }
@@ -564,7 +563,7 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
     }
 
     bool client_set_to_null = client_schema_->has_nullables() &&
-      BitmapTest(client_null_map, client_col_idx);
+        BitmapTest(client_null_map, client_col_idx);
     if (PREDICT_FALSE(client_set_to_null)) {
       op->SetFailureStatusOnce(Status::InvalidArgument("NULL values not allowed for key column",
                                                        col.ToString()));
@@ -614,7 +613,9 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
         uint8_t* val_to_add = nullptr;
         if (!client_set_to_null) {
           RETURN_NOT_OK(ReadColumn(col, scratch, &row_status));
-          if (PREDICT_FALSE(!row_status.ok())) op->SetFailureStatusOnce(row_status);
+          if (PREDICT_FALSE(!row_status.ok())) {
+            op->SetFailureStatusOnce(row_status);
+          }
           val_to_add = scratch;
         } else if (PREDICT_FALSE(!col.is_nullable())) {
           op->SetFailureStatusOnce(Status::InvalidArgument(
@@ -635,8 +636,8 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
 
     if (PREDICT_TRUE(op->result.ok())) {
       // Copy the row-changelist to the arena.
-      auto rcl_in_arena = reinterpret_cast<uint8_t*>(
-        dst_arena_->AllocateBytesAligned(buf.size(), 8));
+      auto* rcl_in_arena = reinterpret_cast<uint8_t*>(
+          dst_arena_->AllocateBytesAligned(buf.size(), 8));
       if (PREDICT_FALSE(rcl_in_arena == nullptr)) {
         return Status::RuntimeError("Out of memory allocating RCL");
       }
@@ -653,7 +654,7 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
             "DELETE should not have a value for column", col.ToString()));
 
         bool client_set_to_null = client_schema_->has_nullables() &&
-          BitmapTest(client_null_map, client_col_idx);
+            BitmapTest(client_null_map, client_col_idx);
         if (!client_set_to_null || !col.is_nullable()) {
           RETURN_NOT_OK(ReadColumnAndDiscard(col));
         }
@@ -671,7 +672,7 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
 
 Status RowOperationsPBDecoder::DecodeSplitRow(const ClientServerMapping& mapping,
                                               DecodedRowOperation* op) {
-  op->split_row.reset(new KuduPartialRow(tablet_schema_));
+  op->split_row = std::make_shared<KuduPartialRow>(tablet_schema_);
 
   const uint8_t* client_isset_map;
   const uint8_t* client_null_map;
@@ -696,12 +697,9 @@ Status RowOperationsPBDecoder::DecodeSplitRow(const ClientServerMapping& mapping
       // If the client provided a value for this column, copy it.
       Slice column_slice;
       RETURN_NOT_OK(GetColumnSlice(col, &column_slice, nullptr));
-      const uint8_t* data;
-      if (col.type_info()->physical_type() == BINARY) {
-        data = reinterpret_cast<const uint8_t*>(&column_slice);
-      } else {
-        data = column_slice.data();
-      }
+      const uint8_t* data =  (col.type_info()->physical_type() == BINARY)
+          ? reinterpret_cast<const uint8_t*>(&column_slice)
+          : column_slice.data();
       RETURN_NOT_OK(op->split_row->Set(static_cast<int32_t>(tablet_col_idx), data));
     }
   }
@@ -737,12 +735,12 @@ Status RowOperationsPBDecoder::DecodeOperations(vector<DecodedRowOperation>* ops
   while (HasNext()) {
     RowOperationsPB::Type type = RowOperationsPB::UNKNOWN;
     RETURN_NOT_OK(ReadOpType(&type));
+
     DecodedRowOperation op;
     op.type = type;
-
     RETURN_NOT_OK(DecodeOp<mode>(type, prototype_row_storage, mapping, &op,
                                  auto_incrementing_counter));
-    ops->push_back(op);
+    ops->emplace_back(std::move(op));
   }
 
   return Status::OK();
@@ -754,34 +752,27 @@ Status RowOperationsPBDecoder::DecodeOp<DecoderMode::WRITE_OPS>(
     const ClientServerMapping& mapping, DecodedRowOperation* op,
     int64_t* auto_incrementing_counter) {
   switch (type) {
-    case RowOperationsPB::UNKNOWN:
-      return Status::NotSupported("Unknown row operation type");
     case RowOperationsPB::UPSERT:
-      if (tablet_schema_->has_auto_incrementing()) {
-        return Status::NotSupported("Tables with auto-incrementing column do not support "
-                                    "UPSERT operations");
-      }
     case RowOperationsPB::UPSERT_IGNORE:
       if (tablet_schema_->has_auto_incrementing()) {
-        return Status::NotSupported("Tables with auto-incrementing column do not support "
-                                    "UPSERT_IGNORE operations");
+        return Status::NotSupported(
+            Substitute("tables with auto-incrementing column do not support "
+                       "$0 operations", RowOperationsPB_Type_Name(type)));
       }
     case RowOperationsPB::INSERT:
     case RowOperationsPB::INSERT_IGNORE:
-      RETURN_NOT_OK(DecodeInsertOrUpsert(prototype_row_storage, mapping, op,
-                                         auto_incrementing_counter));
-      break;
+      return DecodeInsertOrUpsert(prototype_row_storage, mapping, op,
+                                  auto_incrementing_counter);
     case RowOperationsPB::UPDATE:
     case RowOperationsPB::UPDATE_IGNORE:
     case RowOperationsPB::DELETE:
     case RowOperationsPB::DELETE_IGNORE:
-      RETURN_NOT_OK(DecodeUpdateOrDelete(mapping, op));
-      break;
+      return DecodeUpdateOrDelete(mapping, op);
     default:
-      return Status::InvalidArgument(Substitute("Invalid write operation type $0",
-                                                RowOperationsPB_Type_Name(type)));
+      break;
   }
-  return Status::OK();
+  return Status::InvalidArgument(Substitute("Invalid write operation type $0",
+                                            RowOperationsPB_Type_Name(type)));
 }
 
 template<>
@@ -790,25 +781,23 @@ Status RowOperationsPBDecoder::DecodeOp<DecoderMode::SPLIT_ROWS>(
     const ClientServerMapping& mapping, DecodedRowOperation* op,
     int64_t* /*auto_incrementing_counter*/) {
   switch (type) {
-    case RowOperationsPB::UNKNOWN:
-      return Status::NotSupported("Unknown row operation type");
     case RowOperationsPB::SPLIT_ROW:
     case RowOperationsPB::RANGE_LOWER_BOUND:
     case RowOperationsPB::RANGE_UPPER_BOUND:
     case RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND:
     case RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND:
-      RETURN_NOT_OK(DecodeSplitRow(mapping, op));
-      break;
+      return DecodeSplitRow(mapping, op);
     default:
-      return Status::InvalidArgument(Substitute("Invalid split row type $0",
-                                                RowOperationsPB_Type_Name(type)));
+      break;
   }
-  return Status::OK();
+  return Status::InvalidArgument(Substitute("Invalid split row type $0",
+                                            RowOperationsPB_Type_Name(type)));
 }
 
 template
 Status RowOperationsPBDecoder::DecodeOperations<DecoderMode::SPLIT_ROWS>(
     vector<DecodedRowOperation>* ops, int64_t* auto_incrementing_counter);
+
 template
 Status RowOperationsPBDecoder::DecodeOperations<DecoderMode::WRITE_OPS>(
     vector<DecodedRowOperation>* ops, int64_t* auto_incrementing_counter);
diff --git a/src/kudu/tablet/tablet_auto_incrementing-test.cc b/src/kudu/tablet/tablet_auto_incrementing-test.cc
index 0ea250ed5..1650f7195 100644
--- a/src/kudu/tablet/tablet_auto_incrementing-test.cc
+++ b/src/kudu/tablet/tablet_auto_incrementing-test.cc
@@ -99,7 +99,7 @@ TEST_F(AutoIncrementingTabletTest, TestUpsertOp) {
   ASSERT_OK(row->SetInt32(1, 1337));
   Status s = writer_->Upsert(*row);
   ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
-  ASSERT_EQ("Not implemented: Tables with auto-incrementing "
+  ASSERT_EQ("Not implemented: tables with auto-incrementing "
             "column do not support UPSERT operations", s.ToString());
 }
 
@@ -110,7 +110,7 @@ TEST_F(AutoIncrementingTabletTest, TestUpsertIgnoreOp) {
   ASSERT_OK(row->SetInt32(1, 1337));
   Status s = writer_->UpsertIgnore(*row);
   ASSERT_TRUE(s.IsNotSupported()) << s.ToString();
-  ASSERT_EQ("Not implemented: Tables with auto-incrementing "
+  ASSERT_EQ("Not implemented: tables with auto-incrementing "
             "column do not support UPSERT_IGNORE operations", s.ToString());
 }
 
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 608c7ecfa..9d2787505 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -1686,13 +1686,19 @@ TEST_F(TabletServerTest, TestInsertAndMutate) {
 // Try sending write requests that do not contain write operations. Make sure
 // we get an error that makes sense.
 TEST_F(TabletServerTest, TestInvalidWriteRequest_WrongOpType) {
-  const vector<RowOperationsPB::Type> wrong_op_types = {
+  const RowOperationsPB::Type wrong_op_types[] = {
     RowOperationsPB::SPLIT_ROW,
     RowOperationsPB::RANGE_LOWER_BOUND,
     RowOperationsPB::RANGE_UPPER_BOUND,
     RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND,
     RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND,
   };
+  const RowOperationsPB::Type unknown_op_types[] = {
+    RowOperationsPB::UNKNOWN,
+  };
+  const RowOperationsPB::Type unsupported_op_types[] = {
+    static_cast<RowOperationsPB::Type>(RowOperationsPB_Type_Type_MAX + 1),
+  };
   const auto send_bad_write = [&] (RowOperationsPB::Type op_type) {
     WriteRequestPB req;
     req.set_tablet_id(kTabletId);
@@ -1706,26 +1712,35 @@ TEST_F(TabletServerTest, TestInvalidWriteRequest_WrongOpType) {
     CHECK_OK(proxy_->Write(req, &resp, &controller));
     return resp;
   };
-  // Send a bunch of op types that are inappropriate for write requests.
-  for (const auto& op_type : wrong_op_types) {
+  const auto check_op_type = [&] (RowOperationsPB::Type op_type,
+                                  AppStatusPB::ErrorCode expected_code,
+                                  const string& expected_msg_substr) {
     WriteResponsePB resp = send_bad_write(op_type);
     SCOPED_TRACE(SecureDebugString(resp));
     ASSERT_TRUE(resp.has_error());
     ASSERT_EQ(TabletServerErrorPB::MISMATCHED_SCHEMA, resp.error().code());
-    ASSERT_EQ(AppStatusPB::INVALID_ARGUMENT, resp.error().status().code());
-    ASSERT_STR_CONTAINS(resp.error().status().message(),
-                        "Invalid write operation type");
-  }
-  {
-    // Do the same for UNKNOWN, which is an unexpected operation type in all
-    // cases, and thus results in a different error message.
-    WriteResponsePB resp = send_bad_write(RowOperationsPB::UNKNOWN);
-    SCOPED_TRACE(SecureDebugString(resp));
-    ASSERT_TRUE(resp.has_error());
-    ASSERT_EQ(TabletServerErrorPB::MISMATCHED_SCHEMA, resp.error().code());
-    ASSERT_EQ(AppStatusPB::NOT_SUPPORTED, resp.error().status().code());
-    ASSERT_STR_CONTAINS(resp.error().status().message(),
-                        "Unknown row operation type");
+    ASSERT_EQ(expected_code, resp.error().status().code());
+    ASSERT_STR_CONTAINS(resp.error().status().message(), expected_msg_substr);
+  };
+
+  // Send a bunch of op types that are inappropriate for write requests.
+  for (const auto& op_type : wrong_op_types) {
+    NO_FATALS(check_op_type(op_type,
+                            AppStatusPB::INVALID_ARGUMENT,
+                            "Invalid write operation type"));
+  }
+  // Do the same for UNKNOWN.
+  for (const auto& op_type : unknown_op_types) {
+    NO_FATALS(check_op_type(op_type,
+                            AppStatusPB::INVALID_ARGUMENT,
+                            "Invalid write operation type"));
+  }
+  // Try one more value which isn't among the defined ones in the
+  // RowOperationsPB::Type enumeration.
+  for (const auto& op_type : unsupported_op_types) {
+    NO_FATALS(check_op_type(op_type,
+                            AppStatusPB::NOT_SUPPORTED,
+                            "Unknown operation type"));
   }
 }