You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2017/10/06 04:33:43 UTC

[2/2] kudu git commit: KUDU-2167: fix C++ client crash due to bad assumption regarding scan data

KUDU-2167: fix C++ client crash due to bad assumption regarding scan data

The new unit test triggered the crash reliably, though it's probably not the
only way (or perhaps not even the best way) to trigger it.

I also modified the tserver to always populate the scan data fields so that
older clients are also protected. This necessitated an additional change to
how a scanner's "data in open" was computed: the existence of the 'data'
field in the scan response isn't enough. The Java client was already doing
this, so it seems like a reasonable change to make on the C++ side.

Change-Id: If1a8a4e22082cf39710b9f00894f644a0b34234e
Reviewed-on: http://gerrit.cloudera.org:8080/8204
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/858bf73b
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/858bf73b
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/858bf73b

Branch: refs/heads/master
Commit: 858bf73b380d10f16e7aca04a780a0adae705e9a
Parents: 1ee4952
Author: Adar Dembo <ad...@cloudera.com>
Authored: Tue Oct 3 18:34:18 2017 -0700
Committer: Adar Dembo <ad...@cloudera.com>
Committed: Fri Oct 6 04:30:50 2017 +0000

----------------------------------------------------------------------
 src/kudu/client/client-test.cc      | 33 ++++++++++++++++++
 src/kudu/client/scanner-internal.cc | 26 ++++++++------
 src/kudu/tserver/tablet_service.cc  | 59 ++++++++++++--------------------
 3 files changed, 71 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/858bf73b/src/kudu/client/client-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index c888513..450449e 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -5307,5 +5307,38 @@ TEST_F(ClientTest, TestVerboseLevelByEnvVar) {
   ASSERT_EQ(0, FLAGS_v);
 }
 
+// Regression test for KUDU-2167: older versions of Kudu could return a scan
+// response without a 'data' field, crashing the client.
+TEST_F(ClientTest, TestSubsequentScanRequestReturnsNoData) {
+  // Insert some rows.
+  NO_FATALS(InsertTestRows(client_table_.get(), FLAGS_test_scan_num_rows));
+
+  // Set up a table scan.
+  KuduScanner scanner(client_table_.get());
+  ASSERT_OK(scanner.SetProjectedColumns({ "key" }));
+
+  // Ensure that the new scan RPC does not return the data.
+  //
+  // It's OK to leave the scanner configured like this; after the new scan RPC
+  // the server will still return at least one block of data per RPC.
+  ASSERT_OK(scanner.SetBatchSizeBytes(0));
+
+  // This scan should not match any of the inserted rows.
+  unique_ptr<KuduPartialRow> row(client_table_->schema().NewRow());
+  ASSERT_OK(row->SetInt32("key", -1));
+  ASSERT_OK(scanner.AddExclusiveUpperBound(*row));
+
+  // Perform the scan.
+  ASSERT_OK(scanner.Open());
+  ASSERT_TRUE(scanner.HasMoreRows());
+  int count = 0;
+  KuduScanBatch batch;
+  while (scanner.HasMoreRows()) {
+    ASSERT_OK(scanner.NextBatch(&batch));
+    count += batch.NumRows();
+  }
+  ASSERT_EQ(0, count);
+}
+
 } // namespace client
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/858bf73b/src/kudu/client/scanner-internal.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index f61a5a3..92948c9 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -444,7 +444,7 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
   partition_pruner_.RemovePartitionKeyRange(remote_->partition().partition_key_end());
 
   next_req_.clear_new_scan_request();
-  data_in_open_ = last_response_.has_data();
+  data_in_open_ = last_response_.has_data() && last_response_.data().num_rows() > 0;
   if (last_response_.has_more_results()) {
     next_req_.set_scanner_id(last_response_.scanner_id());
     VLOG(2) << "Opened tablet " << remote_->tablet_id()
@@ -550,8 +550,16 @@ Status KuduScanBatch::Data::Reset(RpcController* controller,
   CHECK(controller->finished());
   controller_.Swap(controller);
   projection_ = projection;
+  projected_row_size_ = CalculateProjectedRowSize(*projection_);
   client_projection_ = client_projection;
   row_format_flags_ = row_format_flags;
+  if (!resp_data) {
+    // No new data; just clear out the old stuff.
+    resp_data_.Clear();
+    return Status::OK();
+  }
+
+  // There's new data. Swap it in and process it.
   resp_data_.Swap(resp_data.get());
 
   // First, rewrite the relative addresses into absolute ones.
@@ -561,16 +569,16 @@ Status KuduScanBatch::Data::Reset(RpcController* controller,
 
   Status s = controller_.GetInboundSidecar(resp_data_.rows_sidecar(), &direct_data_);
   if (!s.ok()) {
-    return Status::Corruption("Server sent invalid response: row data "
-        "sidecar index corrupt", s.ToString());
+    return Status::Corruption("Server sent invalid response: "
+        "row data sidecar index corrupt", s.ToString());
   }
 
   if (resp_data_.has_indirect_data_sidecar()) {
     Status s = controller_.GetInboundSidecar(resp_data_.indirect_data_sidecar(),
-                                      &indirect_data_);
+                                             &indirect_data_);
     if (!s.ok()) {
-      return Status::Corruption("Server sent invalid response: indirect data "
-                                "sidecar index corrupt", s.ToString());
+      return Status::Corruption("Server sent invalid response: "
+          "indirect data sidecar index corrupt", s.ToString());
     }
   }
 
@@ -579,10 +587,8 @@ Status KuduScanBatch::Data::Reset(RpcController* controller,
     pad_unixtime_micros_to_16_bytes = true;
   }
 
-  RETURN_NOT_OK(RewriteRowBlockPointers(*projection_, resp_data_, indirect_data_, &direct_data_,
-                                        pad_unixtime_micros_to_16_bytes));
-  projected_row_size_ = CalculateProjectedRowSize(*projection_);
-  return Status::OK();
+  return RewriteRowBlockPointers(*projection_, resp_data_, indirect_data_, &direct_data_,
+                                 pad_unixtime_micros_to_16_bytes);
 }
 
 void KuduScanBatch::Data::ExtractRows(vector<KuduScanBatch::RowPtr>* rows) {

http://git-wip-us.apache.org/repos/asf/kudu/blob/858bf73b/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index be86cb3..bb4bc59 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -419,9 +419,6 @@ class ScanResultCollector {
   virtual void HandleRowBlock(const Schema* client_projection_schema,
                               const RowBlock& row_block) = 0;
 
-  // Returns number of times HandleRowBlock() was called.
-  virtual int BlocksProcessed() const = 0;
-
   // Returns number of bytes which will be returned in the response.
   virtual int64_t ResponseSize() const = 0;
 
@@ -478,21 +475,17 @@ class ScanResultCopier : public ScanResultCollector {
       : rowblock_pb_(DCHECK_NOTNULL(rowblock_pb)),
         rows_data_(DCHECK_NOTNULL(rows_data)),
         indirect_data_(DCHECK_NOTNULL(indirect_data)),
-        blocks_processed_(0),
         num_rows_returned_(0),
         pad_unixtime_micros_to_16_bytes_(false) {}
 
   void HandleRowBlock(const Schema* client_projection_schema,
                               const RowBlock& row_block) override {
-    blocks_processed_++;
     num_rows_returned_ += row_block.selection_vector()->CountSelected();
     SerializeRowBlock(row_block, rowblock_pb_, client_projection_schema,
                       rows_data_, indirect_data_, pad_unixtime_micros_to_16_bytes_);
     SetLastRow(row_block, &last_primary_key_);
   }
 
-  int BlocksProcessed() const override { return blocks_processed_; }
-
   // Returns number of bytes buffered to return.
   int64_t ResponseSize() const override {
     return rows_data_->size() + indirect_data_->size();
@@ -516,7 +509,6 @@ class ScanResultCopier : public ScanResultCollector {
   RowwiseRowBlockPB* const rowblock_pb_;
   faststring* const rows_data_;
   faststring* const indirect_data_;
-  int blocks_processed_;
   int64_t num_rows_returned_;
   faststring last_primary_key_;
   bool pad_unixtime_micros_to_16_bytes_;
@@ -530,13 +522,11 @@ class ScanResultChecksummer : public ScanResultCollector {
   ScanResultChecksummer()
       : crc_(crc::GetCrc32cInstance()),
         agg_checksum_(0),
-        blocks_processed_(0),
         rows_checksummed_(0) {
   }
 
   virtual void HandleRowBlock(const Schema* client_projection_schema,
                               const RowBlock& row_block) OVERRIDE {
-    blocks_processed_++;
     if (!client_projection_schema) {
       client_projection_schema = &row_block.schema();
     }
@@ -552,8 +542,6 @@ class ScanResultChecksummer : public ScanResultCollector {
     SetLastRow(row_block, &encoded_last_row_);
   }
 
-  virtual int BlocksProcessed() const OVERRIDE { return blocks_processed_; }
-
   // Returns a constant -- we only return checksum based on a time budget.
   virtual int64_t ResponseSize() const OVERRIDE { return sizeof(agg_checksum_); }
 
@@ -602,7 +590,6 @@ class ScanResultChecksummer : public ScanResultCollector {
   faststring tmp_buf_;
   crc::Crc* const crc_;
   uint64_t agg_checksum_;
-  int blocks_processed_;
   int64_t rows_checksummed_;
   faststring encoded_last_row_;
 
@@ -1310,31 +1297,29 @@ void TabletServiceImpl::Scan(const ScanRequestPB* req,
   }
   resp->set_has_more_results(has_more_results);
 
-  DVLOG(2) << "Blocks processed: " << collector.BlocksProcessed();
-  if (collector.BlocksProcessed() > 0) {
-    resp->mutable_data()->CopyFrom(data);
-
-    // Add sidecar data to context and record the returned indices.
-    int rows_idx;
-    CHECK_OK(context->AddOutboundSidecar(RpcSidecar::FromFaststring((std::move(rows_data))),
-            &rows_idx));
-    resp->mutable_data()->set_rows_sidecar(rows_idx);
-
-    // Add indirect data as a sidecar, if applicable.
-    if (indirect_data->size() > 0) {
-      int indirect_idx;
-      CHECK_OK(context->AddOutboundSidecar(RpcSidecar::FromFaststring(
-          std::move(indirect_data)), &indirect_idx));
-      resp->mutable_data()->set_indirect_data_sidecar(indirect_idx);
-    }
+  resp->mutable_data()->CopyFrom(data);
 
-    // Set the last row found by the collector.
-    // We could have an empty batch if all the remaining rows are filtered by the predicate,
-    // in which case do not set the last row.
-    const faststring& last = collector.last_primary_key();
-    if (last.length() > 0) {
-      resp->set_last_primary_key(last.ToString());
-    }
+  // Add sidecar data to context and record the returned indices.
+  int rows_idx;
+  CHECK_OK(context->AddOutboundSidecar(
+      RpcSidecar::FromFaststring((std::move(rows_data))), &rows_idx));
+  resp->mutable_data()->set_rows_sidecar(rows_idx);
+
+  // Add indirect data as a sidecar, if applicable.
+  if (indirect_data->size() > 0) {
+    int indirect_idx;
+    CHECK_OK(context->AddOutboundSidecar(
+        RpcSidecar::FromFaststring(std::move(indirect_data)), &indirect_idx));
+    resp->mutable_data()->set_indirect_data_sidecar(indirect_idx);
+  }
+
+  // Set the last row found by the collector.
+  //
+  // We could have an empty batch if all the remaining rows are filtered by the
+  // predicate, in which case do not set the last row.
+  const faststring& last = collector.last_primary_key();
+  if (last.length() > 0) {
+    resp->set_last_primary_key(last.ToString());
   }
   resp->set_propagated_timestamp(server_->clock()->Now().ToUint64());
   SetResourceMetrics(resp->mutable_resource_metrics(), context);