You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/08/12 19:48:58 UTC

[kudu] branch master updated (76b80ec -> 68a4e74)

This is an automated email from the ASF dual-hosted git repository.

adar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 76b80ec  Prepare for upgrading to Hive 3
     new 6f04bc1  Remove redundant references of MasterServiceProxy
     new 68a4e74  [tserver] Move cell and probe key size checking to prepare phase

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/clock/clock.h                 |   1 +
 src/kudu/common/row_operations-test.cc | 165 +++++++++++++++++++++++++++++++++
 src/kudu/common/row_operations.cc      |  41 ++++++--
 src/kudu/common/row_operations.h       |  10 +-
 src/kudu/tablet/row_op.cc              |   3 -
 src/kudu/tablet/row_op.h               |   4 +-
 src/kudu/tablet/tablet.cc              |  88 +++++-------------
 src/kudu/tablet/tablet.h               |  17 ++--
 src/kudu/tablet/tablet_bootstrap.cc    |   2 +-
 src/kudu/tools/tool_action_master.cc   |   1 -
 10 files changed, 242 insertions(+), 90 deletions(-)


[kudu] 01/02: Remove redundant references of MasterServiceProxy

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 6f04bc1edd3e652013af7f67c8ec0a7df41cc224
Author: honeyhexin <ho...@sohu.com>
AuthorDate: Mon Aug 12 21:24:09 2019 +0800

    Remove redundant references of MasterServiceProxy
    
    Change-Id: Ie6b73b347c566b0f93130b775dac15b646d0219a
    Reviewed-on: http://gerrit.cloudera.org:8080/14047
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/tools/tool_action_master.cc | 1 -
 1 file changed, 1 deletion(-)

diff --git a/src/kudu/tools/tool_action_master.cc b/src/kudu/tools/tool_action_master.cc
index 9be7f3e..b4ec058 100644
--- a/src/kudu/tools/tool_action_master.cc
+++ b/src/kudu/tools/tool_action_master.cc
@@ -59,7 +59,6 @@ using kudu::master::ListMastersRequestPB;
 using kudu::master::ListMastersResponsePB;
 using kudu::master::Master;
 using kudu::master::MasterServiceProxy;
-using kudu::master::MasterServiceProxy;
 using kudu::master::ResetAuthzCacheRequestPB;
 using kudu::master::ResetAuthzCacheResponsePB;
 using kudu::consensus::RaftPeerPB;


[kudu] 02/02: [tserver] Move cell and probe key size checking to prepare phase

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 68a4e7492367e36fc117969b4c39f666d66d8f63
Author: Yingchun Lai <40...@qq.com>
AuthorDate: Sat Jul 27 15:15:07 2019 +0800

    [tserver] Move cell and probe key size checking to prepare phase
    
    It's inefficient to check cell and probe key size in apply phase,
    because we have to lock row key and construct new Slices for every
    binary type of columns.
    This patch move this lightweight checking to prepare phase when
    decoding values from protobuf and before locking row keys.
    
    Change-Id: Id3e672272bb1dcf2d0ac1d96ee8a1a2d1489774c
    Reviewed-on: http://gerrit.cloudera.org:8080/13470
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Todd Lipcon <to...@apache.org>
---
 src/kudu/clock/clock.h                 |   1 +
 src/kudu/common/row_operations-test.cc | 165 +++++++++++++++++++++++++++++++++
 src/kudu/common/row_operations.cc      |  41 ++++++--
 src/kudu/common/row_operations.h       |  10 +-
 src/kudu/tablet/row_op.cc              |   3 -
 src/kudu/tablet/row_op.h               |   4 +-
 src/kudu/tablet/tablet.cc              |  88 +++++-------------
 src/kudu/tablet/tablet.h               |  17 ++--
 src/kudu/tablet/tablet_bootstrap.cc    |   2 +-
 9 files changed, 242 insertions(+), 89 deletions(-)

diff --git a/src/kudu/clock/clock.h b/src/kudu/clock/clock.h
index 77b0cca..b4a199a 100644
--- a/src/kudu/clock/clock.h
+++ b/src/kudu/clock/clock.h
@@ -77,6 +77,7 @@ class Clock : public RefCountedThreadSafe<Clock> {
   // that HasPhysicalComponent() return true, otherwise it will crash.
   virtual MonoDelta GetPhysicalComponentDifference(Timestamp /*lhs*/, Timestamp /*rhs*/) const {
     LOG(FATAL) << "Clock's timestamps don't have a physical component.";
+    __builtin_unreachable();
   }
 
   // Update the clock with a transaction timestamp originating from
diff --git a/src/kudu/common/row_operations-test.cc b/src/kudu/common/row_operations-test.cc
index 6741fdb..d28a490 100644
--- a/src/kudu/common/row_operations-test.cc
+++ b/src/kudu/common/row_operations-test.cc
@@ -24,6 +24,7 @@
 #include <string>
 #include <vector>
 
+#include <gflags/gflags_declare.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -36,6 +37,7 @@
 #include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/slice.h"
@@ -46,6 +48,9 @@
 using std::shared_ptr;
 using std::string;
 using std::vector;
+using strings::Substitute;
+
+DECLARE_int32(max_cell_size_bytes);
 
 namespace kudu {
 
@@ -806,4 +811,164 @@ TEST_F(RowOperationsTest, SplitKeyRoundTrip) {
   CHECK(!row2->IsColumnSet("missing"));
 }
 
+namespace {
+void CheckExceedCellLimit(const Schema& client_schema,
+                          const vector<string>& col_values,
+                          RowOperationsPB::Type op_type,
+                          const Status& expect_status,
+                          const string& expect_msg) {
+  ASSERT_EQ(client_schema.num_columns(), col_values.size());
+
+  // Fill the row.
+  KuduPartialRow row(&client_schema);
+  for (size_t i = 0; i < client_schema.num_columns(); ++i) {
+    if (op_type == RowOperationsPB::DELETE && i >= client_schema.num_key_columns()) {
+      // DELETE should not have a value for non-key column.
+      break;
+    }
+    const ColumnSchema& column_schema = client_schema.column(i);
+    switch (column_schema.type_info()->type()) {
+      case STRING:
+        ASSERT_OK(row.SetStringNoCopy(static_cast<int>(i), col_values[i]));
+        break;
+      case BINARY:
+        ASSERT_OK(row.SetBinaryNoCopy(static_cast<int>(i), col_values[i]));
+        break;
+      default:
+        ASSERT_TRUE(false) << "Unsupported type: " << column_schema.ToString();
+    }
+  }
+
+  RowOperationsPB pb;
+  RowOperationsPBEncoder(&pb).Add(op_type, row);
+
+  Arena arena(1024*1024);
+  Schema schema = client_schema.CopyWithColumnIds();
+  RowOperationsPBDecoder decoder(&pb, &client_schema, &schema, &arena);
+  vector<DecodedRowOperation> ops;
+  Status s;
+  switch (op_type) {
+    case RowOperationsPB::INSERT:
+    case RowOperationsPB::UPDATE:
+    case RowOperationsPB::DELETE:
+    case RowOperationsPB::UPSERT:
+      s = decoder.DecodeOperations<WRITE_OPS>(&ops);
+      break;
+    case RowOperationsPB::SPLIT_ROW:
+    case RowOperationsPB::RANGE_LOWER_BOUND:
+    case RowOperationsPB::RANGE_UPPER_BOUND:
+    case RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND:
+    case RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND:
+      s = decoder.DecodeOperations<SPLIT_ROWS>(&ops);
+      break;
+    default:
+      ASSERT_TRUE(false) << "Unsupported op_type " << op_type;
+  }
+  ASSERT_OK(s);
+  for (const auto& op : ops) {
+    ASSERT_EQ(op.result.CodeAsString(), expect_status.CodeAsString());
+    ASSERT_STR_CONTAINS(op.result.ToString(), expect_msg);
+  }
+}
+
+void CheckInsertUpsertExceedCellLimit(const Schema& client_schema,
+                                      const vector<string>& col_values,
+                                      const Status& expect_status,
+                                      const string& expect_msg) {
+  for (auto op_type : { RowOperationsPB::INSERT,
+                        RowOperationsPB::UPSERT }) {
+    NO_FATALS(CheckExceedCellLimit(client_schema, col_values, op_type, expect_status, expect_msg));
+  }
+}
+
+void CheckUpdateDeleteExceedCellLimit(const Schema& client_schema,
+                                      const vector<string>& col_values,
+                                      const Status& expect_status,
+                                      const string& expect_msg) {
+  for (auto op_type : { RowOperationsPB::UPDATE,
+                        RowOperationsPB::DELETE }) {
+    NO_FATALS(CheckExceedCellLimit(client_schema, col_values, op_type, expect_status, expect_msg));
+  }
+}
+
+void CheckWriteExceedCellLimit(const Schema& client_schema,
+                               const vector<string>& col_values,
+                               const Status& expect_status,
+                               const string& expect_msg) {
+  NO_FATALS(CheckInsertUpsertExceedCellLimit(client_schema, col_values, expect_status, expect_msg));
+  NO_FATALS(CheckUpdateDeleteExceedCellLimit(client_schema, col_values, expect_status, expect_msg));
+}
+
+void CheckSplitExceedCellLimit(const Schema& client_schema,
+                               const vector<string>& col_values,
+                               const Status& expect_status,
+                               const string& expect_msg) {
+  for (auto op_type : { RowOperationsPB::SPLIT_ROW,
+                        RowOperationsPB::RANGE_LOWER_BOUND,
+                        RowOperationsPB::RANGE_UPPER_BOUND,
+                        RowOperationsPB::EXCLUSIVE_RANGE_LOWER_BOUND,
+                        RowOperationsPB::INCLUSIVE_RANGE_UPPER_BOUND }) {
+    NO_FATALS(CheckExceedCellLimit(client_schema, col_values, op_type, expect_status, expect_msg));
+  }
+}
+} // anonymous namespace
+
+TEST_F(RowOperationsTest, ExceedCellLimit) {
+  Schema client_schema = Schema({ ColumnSchema("key_string", STRING),
+                                  ColumnSchema("key_binary", BINARY),
+                                  ColumnSchema("col_string", STRING),
+                                  ColumnSchema("col_binary", BINARY) },
+                                2);
+
+  const string long_string(static_cast<size_t>(FLAGS_max_cell_size_bytes), 'a');
+  const string too_long_string(static_cast<size_t>(FLAGS_max_cell_size_bytes + 1), 'a');
+  const vector<string> base_col_values(4, long_string);
+
+  // All column cell sizes are not exceed.
+  NO_FATALS(CheckWriteExceedCellLimit(client_schema, base_col_values, Status::OK(), ""));
+
+  // Some column cell size exceed for INSERT and UPSERT operation.
+  for (size_t i = 0; i < client_schema.num_columns(); ++i) {
+    auto col_values(base_col_values);
+    col_values[i] = too_long_string;
+    NO_FATALS(CheckInsertUpsertExceedCellLimit(client_schema,
+                                               col_values,
+                                               Status::InvalidArgument(""),
+                                               Substitute("value too large for column '$0'"
+                                                          " ($1 bytes, maximum is $2 bytes)",
+                                                          client_schema.column(i).name(),
+                                                          FLAGS_max_cell_size_bytes + 1,
+                                                          FLAGS_max_cell_size_bytes)));
+  }
+
+  // Key column cell size exceed for UPDATE and DELETE operation, it's OK.
+  for (size_t i = 0; i < client_schema.num_key_columns(); ++i) {
+    auto col_values(base_col_values);
+    col_values[i] = too_long_string;
+    NO_FATALS(CheckUpdateDeleteExceedCellLimit(client_schema, col_values, Status::OK(), ""));
+  }
+
+  // Non-key column cell size exceed for UPDATE operation.
+  for (size_t i = client_schema.num_key_columns(); i < client_schema.num_columns(); ++i) {
+    auto col_values(base_col_values);
+    col_values[i] = too_long_string;
+    NO_FATALS(CheckExceedCellLimit(client_schema,
+                                   col_values,
+                                   RowOperationsPB::UPDATE,
+                                   Status::InvalidArgument(""),
+                                   Substitute("value too large for column '$0'"
+                                              " ($1 bytes, maximum is $2 bytes)",
+                                              client_schema.column(i).name(),
+                                              FLAGS_max_cell_size_bytes + 1,
+                                              FLAGS_max_cell_size_bytes)));
+  }
+
+  // Some column cell size exceed for SPLIT_ROW type operation, it's OK.
+  for (size_t i = 0; i < client_schema.num_columns(); ++i) {
+    auto col_values(base_col_values);
+    col_values[i] = too_long_string;
+    NO_FATALS(CheckSplitExceedCellLimit(client_schema, col_values, Status::OK(), ""));
+  }
+}
+
 } // namespace kudu
diff --git a/src/kudu/common/row_operations.cc b/src/kudu/common/row_operations.cc
index 166f559..0603b3b 100644
--- a/src/kudu/common/row_operations.cc
+++ b/src/kudu/common/row_operations.cc
@@ -22,6 +22,7 @@
 #include <string>
 #include <utility>
 
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 
 #include "kudu/common/common.pb.h"
@@ -34,6 +35,7 @@
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/flag_tags.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/safe_math.h"
@@ -43,6 +45,12 @@ using std::string;
 using std::vector;
 using strings::Substitute;
 
+DEFINE_int32(max_cell_size_bytes, 64 * 1024,
+             "The maximum size of any individual cell in a table. Attempting to store "
+             "string or binary columns with a size greater than this will result "
+             "in errors.");
+TAG_FLAG(max_cell_size_bytes, unsafe);
+
 namespace kudu {
 
 string DecodedRowOperation::ToString(const Schema& schema) const {
@@ -202,7 +210,9 @@ Status RowOperationsPBDecoder::ReadNullBitmap(const uint8_t** null_bm) {
   return Status::OK();
 }
 
-Status RowOperationsPBDecoder::GetColumnSlice(const ColumnSchema& col, Slice* slice) {
+Status RowOperationsPBDecoder::GetColumnSlice(const ColumnSchema& col,
+                                              Slice* slice,
+                                              Status* row_status) {
   size_t size = col.type_info()->size();
   if (PREDICT_FALSE(src_.size() < size)) {
     return Status::Corruption("Not enough data for column", col.ToString());
@@ -219,6 +229,15 @@ Status RowOperationsPBDecoder::GetColumnSlice(const ColumnSchema& col, Slice* sl
       return Status::Corruption("Bad indirect slice");
     }
 
+    // Check that no individual cell is larger than the specified max.
+    if (PREDICT_FALSE(row_status && ptr_slice->size() > FLAGS_max_cell_size_bytes)) {
+      *row_status = Status::InvalidArgument(Substitute(
+          "value too large for column '$0' ($1 bytes, maximum is $2 bytes)",
+          col.name(), ptr_slice->size(), FLAGS_max_cell_size_bytes));
+      // After one row's column size has been found to exceed the limit and has been recorded
+      // in 'row_status', we will consider it OK and continue to consume data in order to properly
+      // validate subsequent columns and rows.
+    }
     *slice = Slice(&pb_->indirect_data()[offset_in_indirect], ptr_slice->size());
   } else {
     *slice = Slice(src_.data(), size);
@@ -227,9 +246,11 @@ Status RowOperationsPBDecoder::GetColumnSlice(const ColumnSchema& col, Slice* sl
   return Status::OK();
 }
 
-Status RowOperationsPBDecoder::ReadColumn(const ColumnSchema& col, uint8_t* dst) {
+Status RowOperationsPBDecoder::ReadColumn(const ColumnSchema& col,
+                                          uint8_t* dst,
+                                          Status* row_status) {
   Slice slice;
-  RETURN_NOT_OK(GetColumnSlice(col, &slice));
+  RETURN_NOT_OK(GetColumnSlice(col, &slice, row_status));
   if (col.type_info()->physical_type() == BINARY) {
     memcpy(dst, &slice, col.type_info()->size());
   } else {
@@ -240,7 +261,7 @@ Status RowOperationsPBDecoder::ReadColumn(const ColumnSchema& col, uint8_t* dst)
 
 Status RowOperationsPBDecoder::ReadColumnAndDiscard(const ColumnSchema& col) {
   uint8_t scratch[kLargestTypeSize];
-  RETURN_NOT_OK(ReadColumn(col, scratch));
+  RETURN_NOT_OK(ReadColumn(col, scratch, nullptr));
   return Status::OK();
 }
 
@@ -373,6 +394,7 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
 
   // Now handle each of the columns passed by the user, replacing the defaults
   // from the prototype.
+  Status row_status;
   for (size_t client_col_idx = 0;
        client_col_idx < client_schema_->num_columns();
        client_col_idx++) {
@@ -395,7 +417,8 @@ Status RowOperationsPBDecoder::DecodeInsertOrUpsert(const uint8_t* prototype_row
       }
       if (!client_set_to_null) {
         // Copy the value if it's not null.
-        RETURN_NOT_OK(ReadColumn(col, tablet_row.mutable_cell_ptr(tablet_col_idx)));
+        RETURN_NOT_OK(ReadColumn(col, tablet_row.mutable_cell_ptr(tablet_col_idx), &row_status));
+        if (PREDICT_FALSE(!row_status.ok())) op->SetFailureStatusOnce(row_status);
       } else if (PREDICT_FALSE(!col.is_nullable())) {
         op->SetFailureStatusOnce(Status::InvalidArgument(
             "NULL values not allowed for non-nullable column", col.ToString()));
@@ -467,7 +490,7 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
       continue;
     }
 
-    RETURN_NOT_OK(ReadColumn(col, rowkey.mutable_cell_ptr(tablet_col_idx)));
+    RETURN_NOT_OK(ReadColumn(col, rowkey.mutable_cell_ptr(tablet_col_idx), nullptr));
   }
   op->row_data = rowkey_storage;
 
@@ -475,6 +498,7 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
   // For UPDATE, we expect at least one other column to be set, indicating the
   // update to perform.
   // For DELETE, we expect no other columns to be set (and we verify that).
+  Status row_status;
   if (op->type == RowOperationsPB::UPDATE) {
     faststring buf;
     RowChangeListEncoder rcl_encoder(&buf);
@@ -490,7 +514,8 @@ Status RowOperationsPBDecoder::DecodeUpdateOrDelete(const ClientServerMapping& m
         uint8_t scratch[kLargestTypeSize];
         uint8_t* val_to_add = nullptr;
         if (!client_set_to_null) {
-          RETURN_NOT_OK(ReadColumn(col, scratch));
+          RETURN_NOT_OK(ReadColumn(col, scratch, &row_status));
+          if (PREDICT_FALSE(!row_status.ok())) op->SetFailureStatusOnce(row_status);
           val_to_add = scratch;
         } else if (PREDICT_FALSE(!col.is_nullable())) {
           op->SetFailureStatusOnce(Status::InvalidArgument(
@@ -570,7 +595,7 @@ Status RowOperationsPBDecoder::DecodeSplitRow(const ClientServerMapping& mapping
     if (BitmapTest(client_isset_map, client_col_idx)) {
       // If the client provided a value for this column, copy it.
       Slice column_slice;
-      RETURN_NOT_OK(GetColumnSlice(col, &column_slice));
+      RETURN_NOT_OK(GetColumnSlice(col, &column_slice, nullptr));
       const uint8_t* data;
       if (col.type_info()->physical_type() == BINARY) {
         data = reinterpret_cast<const uint8_t*>(&column_slice);
diff --git a/src/kudu/common/row_operations.h b/src/kudu/common/row_operations.h
index c1f0bae..0f5224e 100644
--- a/src/kudu/common/row_operations.h
+++ b/src/kudu/common/row_operations.h
@@ -104,8 +104,14 @@ class RowOperationsPBDecoder {
   Status ReadOpType(RowOperationsPB::Type* type);
   Status ReadIssetBitmap(const uint8_t** bitmap);
   Status ReadNullBitmap(const uint8_t** null_bm);
-  Status GetColumnSlice(const ColumnSchema& col, Slice* slice);
-  Status ReadColumn(const ColumnSchema& col, uint8_t* dst);
+  // Read one row's column data from 'src_', read result is stored in 'slice'.
+  // Return bad Status if data is corrupt.
+  // NOTE: If 'row_status' is not nullptr, column data validate will be performed,
+  // and if column data validate error (i.e. column size exceed the limit), only
+  // set bad Status to 'row_status', and return Status::OK.
+  Status GetColumnSlice(const ColumnSchema& col, Slice* slice, Status* row_status);
+  // Same as above, but store result in 'dst'.
+  Status ReadColumn(const ColumnSchema& col, uint8_t* dst, Status* row_status);
   // Some column which is non-nullable has allocated a cell to row data in
   // RowOperationsPBEncoder::Add, even if its data is useless (i.e. set to
   // NULL), we have to consume data in order to properly validate subsequent
diff --git a/src/kudu/tablet/row_op.cc b/src/kudu/tablet/row_op.cc
index 3c084a0..464a8ed 100644
--- a/src/kudu/tablet/row_op.cc
+++ b/src/kudu/tablet/row_op.cc
@@ -37,8 +37,6 @@ RowOp::RowOp(DecodedRowOperation op)
     : decoded_op(std::move(op)) {
   if (!decoded_op.result.ok()) {
     SetFailed(decoded_op.result);
-    // This row has been validated as invalid in the prior phase.
-    validated = true;
   }
 }
 
@@ -64,7 +62,6 @@ std::string RowOp::ToString(const Schema& schema) const {
 }
 
 void RowOp::SetSkippedResult(const OperationResultPB& result) {
-  DCHECK(!this->result) << SecureDebugString(*this->result);
   DCHECK(result.skip_on_replay());
   this->result.reset(new OperationResultPB(result));
 }
diff --git a/src/kudu/tablet/row_op.h b/src/kudu/tablet/row_op.h
index 5f2124f..c63a345 100644
--- a/src/kudu/tablet/row_op.h
+++ b/src/kudu/tablet/row_op.h
@@ -83,8 +83,8 @@ struct RowOp {
   // phase.
   ScopedRowLock row_lock;
 
-  // Flag whether this op has already been validated.
-  bool validated = false;
+  // Flag whether this op has already been validated as valid.
+  bool valid = false;
 
   // Flag whether this op has already had 'present_in_rowset' filled in.
   // If false, 'present_in_rowset' must be nullptr. If true, and
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index c97d93d..39f1a25 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -18,8 +18,6 @@
 #include "kudu/tablet/tablet.h"
 
 #include <algorithm>
-#include <cstdlib>
-#include <cstring>
 #include <iterator>
 #include <memory>
 #include <mutex>
@@ -47,7 +45,6 @@
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
-#include "kudu/common/types.h"
 #include "kudu/common/wire_protocol.pb.h"
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/opid.pb.h"
@@ -142,12 +139,6 @@ DEFINE_int32(tablet_history_max_age_sec, 60 * 60 * 24 * 7,
 TAG_FLAG(tablet_history_max_age_sec, advanced);
 TAG_FLAG(tablet_history_max_age_sec, stable);
 
-DEFINE_int32(max_cell_size_bytes, 64 * 1024,
-             "The maximum size of any individual cell in a table. Attempting to store "
-             "string or binary columns with a size greater than this will result "
-             "in errors.");
-TAG_FLAG(max_cell_size_bytes, unsafe);
-
 // Large encoded keys cause problems because we store the min/max encoded key in the
 // CFile footer for the composite key column. The footer has a max length of 64K, so
 // the default here comfortably fits two of them with room for other metadata.
@@ -426,7 +417,7 @@ Status Tablet::DecodeWriteOperations(const Schema* client_schema,
                                      WriteTransactionState* tx_state) {
   TRACE_EVENT0("tablet", "Tablet::DecodeWriteOperations");
 
-  DCHECK_EQ(tx_state->row_ops().size(), 0);
+  DCHECK(tx_state->row_ops().empty());
 
   // Acquire the schema lock in shared mode, so that the schema doesn't
   // change while this transaction is in-flight.
@@ -484,6 +475,9 @@ Status Tablet::CheckRowInTablet(const ConstContiguousRow& row) const {
 Status Tablet::AcquireLockForOp(WriteTransactionState* tx_state, RowOp* op) {
   ConstContiguousRow row_key(&key_schema_, op->decoded_op.row_data);
   op->key_probe.reset(new tablet::RowSetKeyProbe(row_key));
+  if (PREDICT_FALSE(!ValidateOpOrMarkFailed(op))) {
+    return Status::OK();
+  }
   RETURN_NOT_OK(CheckRowInTablet(row_key));
 
   op->row_lock = ScopedRowLock(&lock_manager_,
@@ -519,8 +513,12 @@ void Tablet::StartTransaction(WriteTransactionState* tx_state) {
   tx_state->SetMvccTx(std::move(mvcc_tx));
 }
 
-bool Tablet::ValidateOpOrMarkFailed(RowOp* op) const {
-  if (op->validated) return true;
+bool Tablet::ValidateOpOrMarkFailed(RowOp* op) {
+  if (op->valid) return true;
+  if (PREDICT_FALSE(op->has_result())) {
+    DCHECK(op->result->has_failed_status());
+    return false;
+  }
 
   Status s = ValidateOp(*op);
   if (PREDICT_FALSE(!s.ok())) {
@@ -528,11 +526,11 @@ bool Tablet::ValidateOpOrMarkFailed(RowOp* op) const {
     op->SetFailed(s);
     return false;
   }
-  op->validated = true;
+  op->valid = true;
   return true;
 }
 
-Status Tablet::ValidateOp(const RowOp& op) const {
+Status Tablet::ValidateOp(const RowOp& op) {
   switch (op.decoded_op.type) {
     case RowOperationsPB::INSERT:
     case RowOperationsPB::UPSERT:
@@ -543,28 +541,12 @@ Status Tablet::ValidateOp(const RowOp& op) const {
       return ValidateMutateUnlocked(op);
 
     default:
-      LOG_WITH_PREFIX(FATAL) << RowOperationsPB::Type_Name(op.decoded_op.type);
-  }
-  abort(); // unreachable
-}
-
-Status Tablet::ValidateInsertOrUpsertUnlocked(const RowOp& op) const {
-  // Check that no individual cell is larger than the specified max.
-  ConstContiguousRow row(schema(), op.decoded_op.row_data);
-  for (int i = 0; i < schema()->num_columns(); i++) {
-    if (!BitmapTest(op.decoded_op.isset_bitmap, i)) continue;
-    const auto& col = schema()->column(i);
-    if (col.type_info()->physical_type() != BINARY) continue;
-    const auto& cell = row.cell(i);
-    if (cell.is_nullable() && cell.is_null()) continue;
-    Slice s;
-    memcpy(&s, cell.ptr(), sizeof(s));
-    if (PREDICT_FALSE(s.size() > FLAGS_max_cell_size_bytes)) {
-      return Status::InvalidArgument(Substitute(
-          "value too large for column '$0' ($1 bytes, maximum is $2 bytes)",
-          col.name(), s.size(), FLAGS_max_cell_size_bytes));
-    }
+      LOG(FATAL) << RowOperationsPB::Type_Name(op.decoded_op.type);
   }
+  __builtin_unreachable();
+}
+
+Status Tablet::ValidateInsertOrUpsertUnlocked(const RowOp& op) {
   // Check that the encoded key is not longer than the maximum.
   auto enc_key_size = op.key_probe->encoded_key_slice().size();
   if (PREDICT_FALSE(enc_key_size > FLAGS_max_encoded_key_size_bytes)) {
@@ -575,7 +557,7 @@ Status Tablet::ValidateInsertOrUpsertUnlocked(const RowOp& op) const {
   return Status::OK();
 }
 
-Status Tablet::ValidateMutateUnlocked(const RowOp& op) const {
+Status Tablet::ValidateMutateUnlocked(const RowOp& op) {
   RowChangeListDecoder rcl_decoder(op.decoded_op.changelist);
   RETURN_NOT_OK(rcl_decoder.Init());
   if (rcl_decoder.is_reinsert()) {
@@ -591,21 +573,7 @@ Status Tablet::ValidateMutateUnlocked(const RowOp& op) const {
     return Status::OK();
   }
 
-  // For updates, just check the new cell values themselves, and not the row key,
-  // following the same logic.
-  while (rcl_decoder.HasNext()) {
-    RowChangeListDecoder::DecodedUpdate cell_update;
-    RETURN_NOT_OK(rcl_decoder.DecodeNext(&cell_update));
-    if (cell_update.null) continue;
-    Slice s = cell_update.raw_value;
-    if (PREDICT_FALSE(s.size() > FLAGS_max_cell_size_bytes)) {
-      const auto& col = schema()->column_by_id(cell_update.col_id);
-      return Status::InvalidArgument(Substitute(
-          "value too large for column '$0' ($1 bytes, maximum is $2 bytes)",
-          col.name(), s.size(), FLAGS_max_cell_size_bytes));
-
-    }
-  }
+  DCHECK(rcl_decoder.is_update());
   return Status::OK();
 }
 
@@ -614,7 +582,7 @@ Status Tablet::InsertOrUpsertUnlocked(const IOContext* io_context,
                                       RowOp* op,
                                       ProbeStats* stats) {
   DCHECK(op->checked_present);
-  DCHECK(op->validated);
+  DCHECK(op->valid);
 
   const bool is_upsert = op->decoded_op.type == RowOperationsPB::UPSERT;
   const TabletComponents* comps = DCHECK_NOTNULL(tx_state->tablet_components());
@@ -753,7 +721,7 @@ Status Tablet::MutateRowUnlocked(const IOContext* io_context,
                                  RowOp* mutate,
                                  ProbeStats* stats) {
   DCHECK(mutate->checked_present);
-  DCHECK(mutate->validated);
+  DCHECK(mutate->valid);
 
   gscoped_ptr<OperationResultPB> result(new OperationResultPB());
   const TabletComponents* comps = DCHECK_NOTNULL(tx_state->tablet_components());
@@ -932,11 +900,6 @@ Status Tablet::ApplyRowOperations(WriteTransactionState* tx_state) {
 
   StartApplying(tx_state);
 
-  // Validate all of the ops.
-  for (RowOp* op : tx_state->row_ops()) {
-    ValidateOpOrMarkFailed(op);
-  }
-
   IOContext io_context({ tablet_id() });
   RETURN_NOT_OK(BulkCheckPresence(&io_context, tx_state));
 
@@ -944,7 +907,6 @@ Status Tablet::ApplyRowOperations(WriteTransactionState* tx_state) {
   for (int op_idx = 0; op_idx < num_ops; op_idx++) {
     RowOp* row_op = tx_state->row_ops()[op_idx];
     if (row_op->has_result()) continue;
-
     RETURN_NOT_OK(ApplyRowOperation(&io_context, tx_state, row_op,
                                     tx_state->mutable_op_stats(op_idx)));
     DCHECK(row_op->has_result());
@@ -960,6 +922,10 @@ Status Tablet::ApplyRowOperation(const IOContext* io_context,
                                  WriteTransactionState* tx_state,
                                  RowOp* row_op,
                                  ProbeStats* stats) {
+  if (!ValidateOpOrMarkFailed(row_op)) {
+    return Status::OK();
+  }
+
   {
     std::lock_guard<simple_spinlock> l(state_lock_);
     RETURN_NOT_OK_PREPEND(CheckHasNotBeenStoppedUnlocked(),
@@ -971,10 +937,6 @@ Status Tablet::ApplyRowOperation(const IOContext* io_context,
   DCHECK(tx_state->op_id().IsInitialized()) << "TransactionState OpId needed for anchoring";
   DCHECK_EQ(tx_state->schema_at_decode_time(), schema());
 
-  if (!ValidateOpOrMarkFailed(row_op)) {
-    return Status::OK();
-  }
-
   // If we were unable to check rowset presence in batch (e.g. because we are processing
   // a batch which contains some duplicate keys) we need to do so now.
   if (PREDICT_FALSE(!row_op->checked_present)) {
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 7110a5b..d7bfb8d 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -526,20 +526,17 @@ class Tablet {
   Status FlushUnlocked();
 
   // Validate the contents of 'op' and return a bad Status if it is invalid.
-  Status ValidateOp(const RowOp& op) const;
+  static Status ValidateOp(const RowOp& op);
 
   // Validate 'op' as in 'ValidateOp()' above. If it is invalid, marks the op as failed
-  // and returns false. If valid, marks the op as validated and returns true.
-  bool ValidateOpOrMarkFailed(RowOp* op) const;
+  // and returns false. If valid, marks the op as valid and returns true.
+  static bool ValidateOpOrMarkFailed(RowOp* op);
 
-  // Validate the given insert/upsert operation. In particular, checks that the size
-  // of any cells is not too large given the configured maximum on the server, and
-  // that the encoded key is not too large.
-  Status ValidateInsertOrUpsertUnlocked(const RowOp& op) const;
+  // Validate the given insert/upsert operation.
+  static Status ValidateInsertOrUpsertUnlocked(const RowOp& op);
 
-  // Validate the given update/delete operation. In particular, validates that no
-  // cell is being updated to an invalid (too large) value.
-  Status ValidateMutateUnlocked(const RowOp& op) const;
+  // Validate the given update/delete operation.
+  static Status ValidateMutateUnlocked(const RowOp& op);
 
   // Perform an INSERT or UPSERT operation, assuming that the transaction is already in
   // prepared state. This state ensures that:
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index d37157e..5abc342 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -1586,7 +1586,7 @@ Status TabletBootstrap::ApplyOperations(const IOContext* io_context,
     // Actually apply it.
     ProbeStats stats; // we don't use this, but tablet internals require non-NULL.
     RETURN_NOT_OK(tablet_->ApplyRowOperation(io_context, tx_state, op, &stats));
-    DCHECK(op->result != nullptr);
+    DCHECK(op->has_result());
 
     // We expect that the above Apply() will always succeed, because we're
     // applying an operation that we know succeeded before the server