You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/04/01 22:24:17 UTC

[kudu] branch master updated (00e7ca6 -> fa2d4c7)

This is an automated email from the ASF dual-hosted git repository.

todd pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 00e7ca6  tserver: add support for returning scan result in columnar layout
     new abd29cc  [ranger] authorize list tables should never throw NotAuthorized
     new fa2d4c7  client: add support for columnar format scan

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/client/CMakeLists.txt                     |   2 +
 src/kudu/client/client-test.cc                     |  45 ++++++++
 src/kudu/client/client.cc                          |  41 ++++---
 src/kudu/client/client.h                           |  29 +++++
 ...{write_op-internal.h => columnar_scan_batch.cc} |  35 +++++-
 src/kudu/client/columnar_scan_batch.h              |  90 +++++++++++++++
 src/kudu/client/scanner-internal.cc                | 121 ++++++++++++++++++++-
 src/kudu/client/scanner-internal.h                 |  59 +++++++++-
 src/kudu/master/ranger_authz_provider.cc           |   6 +
 src/kudu/ranger/ranger_client-test.cc              |  13 +--
 src/kudu/ranger/ranger_client.cc                   |  10 --
 src/kudu/ranger/ranger_client.h                    |   6 +-
 src/kudu/tools/tool_action_remote_replica.cc       |   2 +-
 13 files changed, 409 insertions(+), 50 deletions(-)
 copy src/kudu/client/{write_op-internal.h => columnar_scan_batch.cc} (50%)
 create mode 100644 src/kudu/client/columnar_scan_batch.h


[kudu] 02/02: client: add support for columnar format scan

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit fa2d4c756646b3f8428d2931113e8dfc3bc5af79
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Thu Mar 26 22:28:06 2020 -0700

    client: add support for columnar format scan
    
    Change-Id: I69ec4487c78b872e7b9eba5facdd44cfd6b8f4fa
    Reviewed-on: http://gerrit.cloudera.org:8080/15622
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/CMakeLists.txt               |   2 +
 src/kudu/client/client-test.cc               |  45 ++++++++++
 src/kudu/client/client.cc                    |  41 +++++----
 src/kudu/client/client.h                     |  29 +++++++
 src/kudu/client/columnar_scan_batch.cc       |  55 ++++++++++++
 src/kudu/client/columnar_scan_batch.h        |  90 ++++++++++++++++++++
 src/kudu/client/scanner-internal.cc          | 121 +++++++++++++++++++++++++--
 src/kudu/client/scanner-internal.h           |  59 ++++++++++++-
 src/kudu/tools/tool_action_remote_replica.cc |   2 +-
 9 files changed, 421 insertions(+), 23 deletions(-)

diff --git a/src/kudu/client/CMakeLists.txt b/src/kudu/client/CMakeLists.txt
index ffd536a..f456757 100644
--- a/src/kudu/client/CMakeLists.txt
+++ b/src/kudu/client/CMakeLists.txt
@@ -34,6 +34,7 @@ set(CLIENT_SRCS
   client.cc
   client_builder-internal.cc
   client-internal.cc
+  columnar_scan_batch.cc
   error_collector.cc
   error-internal.cc
   hash.cc
@@ -177,6 +178,7 @@ install(TARGETS kudu_client_exported
 install(FILES
   callbacks.h
   client.h
+  columnar_scan_batch.h
   hash.h
   row_result.h
   scan_batch.h
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index f8bca5d..668c9d3 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -51,6 +51,7 @@
 #include "kudu/client/client-internal.h"
 #include "kudu/client/client-test-util.h"
 #include "kudu/client/client.pb.h"
+#include "kudu/client/columnar_scan_batch.h"
 #include "kudu/client/error_collector.h"
 #include "kudu/client/meta_cache.h"
 #include "kudu/client/resource_metrics.h"
@@ -104,6 +105,7 @@
 #include "kudu/tserver/tablet_server_options.h"
 #include "kudu/tserver/ts_tablet_manager.h"
 #include "kudu/tserver/tserver.pb.h"
+#include "kudu/util/array_view.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/barrier.h"
 #include "kudu/util/countdown_latch.h"
@@ -1051,6 +1053,49 @@ TEST_F(ClientTest, TestScanAtFutureTimestamp) {
   ASSERT_STR_CONTAINS(s.ToString(), "in the future.");
 }
 
+TEST_F(ClientTest, TestColumnarScan) {
+  // Set the batch size such that a full scan could yield either multi-batch
+  // or single-batch scans.
+  int64_t num_rows = rand() % 2000;
+  int64_t batch_size = rand() % 1000;
+  FLAGS_scanner_batch_size_rows = batch_size;
+
+  NO_FATALS(InsertTestRows(client_table_.get(), num_rows));
+  KuduScanner scanner(client_table_.get());
+  ASSERT_OK(scanner.SetRowFormatFlags(KuduScanner::COLUMNAR_LAYOUT));
+
+  ASSERT_OK(scanner.Open());
+  KuduColumnarScanBatch batch;
+  int total_rows = 0;
+  while (scanner.HasMoreRows()) {
+    ASSERT_OK(scanner.NextBatch(&batch));
+
+    // Verify the data.
+    Slice col_data[4];
+    for (int i = 0; i < 4; i++) {
+      ASSERT_OK(batch.GetDataForColumn(i, &col_data[i]));
+    }
+    ArrayView<const int32_t> c0(reinterpret_cast<const int32_t*>(col_data[0].data()),
+                                batch.NumRows());
+    ArrayView<const int32_t> c1(reinterpret_cast<const int32_t*>(col_data[1].data()),
+                                batch.NumRows());
+    ArrayView<const Slice> c2(reinterpret_cast<const Slice*>(col_data[2].data()),
+                              batch.NumRows());
+    ArrayView<const int32_t> c3(reinterpret_cast<const int32_t*>(col_data[3].data()),
+                                batch.NumRows());
+
+    for (int i = 0; i < batch.NumRows(); i++) {
+      int row_idx = total_rows + i;
+      EXPECT_EQ(row_idx, c0[i]);
+      EXPECT_EQ(row_idx * 2, c1[i]);
+      EXPECT_EQ(Substitute("hello $0", row_idx), c2[i]);
+      EXPECT_EQ(row_idx * 3, c3[i]);
+    }
+    total_rows += batch.NumRows();
+  }
+  ASSERT_EQ(num_rows, total_rows);
+}
+
 const KuduScanner::ReadMode read_modes[] = {
     KuduScanner::READ_LATEST,
     KuduScanner::READ_AT_SNAPSHOT,
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 571844a..968537b 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -37,6 +37,7 @@
 #include "kudu/client/client-internal.h"
 #include "kudu/client/client.pb.h"
 #include "kudu/client/client_builder-internal.h"
+#include "kudu/client/columnar_scan_batch.h"
 #include "kudu/client/error-internal.h"
 #include "kudu/client/error_collector.h"
 #include "kudu/client/master_proxy_rpc.h"
@@ -1569,6 +1570,7 @@ Status KuduScanner::SetRowFormatFlags(uint64_t flags) {
   switch (flags) {
     case NO_FLAGS:
     case PAD_UNIXTIME_MICROS_TO_16_BYTES:
+    case COLUMNAR_LAYOUT:
       break;
     default:
       return Status::InvalidArgument(Substitute("Invalid row format flags: $0", flags));
@@ -1711,6 +1713,15 @@ Status KuduScanner::NextBatch(vector<KuduRowResult>* rows) {
 }
 
 Status KuduScanner::NextBatch(KuduScanBatch* batch) {
+  return NextBatch(batch->data_);
+}
+
+Status KuduScanner::NextBatch(KuduColumnarScanBatch* batch) {
+  return NextBatch(batch->data_);
+}
+
+Status KuduScanner::NextBatch(internal::ScanBatchDataInterface* batch_data) {
+
   // TODO: do some double-buffering here -- when we return this batch
   // we should already have fired off the RPC for the next batch, but
   // need to do some swapping of the response objects around to avoid
@@ -1718,7 +1729,7 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
   CHECK(data_->open_);
   CHECK(data_->proxy_);
 
-  batch->data_->Clear();
+  batch_data->Clear();
 
   if (data_->short_circuit_) {
     return Status::OK();
@@ -1728,12 +1739,11 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
     // We have data from a previous scan.
     VLOG(2) << "Extracting data from " << data_->DebugString();
     data_->data_in_open_ = false;
-    return batch->data_->Reset(&data_->controller_,
-                               data_->configuration().projection(),
-                               data_->configuration().client_projection(),
-                               data_->configuration().row_format_flags(),
-                               unique_ptr<RowwiseRowBlockPB>(
-                                   data_->last_response_.release_data()));
+    return batch_data->Reset(&data_->controller_,
+                             data_->configuration().projection(),
+                             data_->configuration().client_projection(),
+                             data_->configuration().row_format_flags(),
+                             &data_->last_response_);
   }
 
   if (data_->last_response_.has_more_results()) {
@@ -1753,12 +1763,11 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
           data_->last_primary_key_ = data_->last_response_.last_primary_key();
         }
         data_->scan_attempts_ = 0;
-        return batch->data_->Reset(&data_->controller_,
-                                   data_->configuration().projection(),
-                                   data_->configuration().client_projection(),
-                                   data_->configuration().row_format_flags(),
-                                   unique_ptr<RowwiseRowBlockPB>(
-                                       data_->last_response_.release_data()));
+        return batch_data->Reset(&data_->controller_,
+                                 data_->configuration().projection(),
+                                 data_->configuration().client_projection(),
+                                 data_->configuration().row_format_flags(),
+                                 &data_->last_response_);
       }
 
       data_->scan_attempts_++;
@@ -1797,7 +1806,11 @@ Status KuduScanner::NextBatch(KuduScanBatch* batch) {
     set<string> blacklist;
 
     RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist));
-    // No rows written, the next invocation will pick them up.
+    if (data_->data_in_open_) {
+      // Avoid returning an empty batch in between tablets if we have data
+      // we can return from this call.
+      return NextBatch(batch_data);
+    }
     return Status::OK();
   } else {
     // No more data anywhere.
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 5c59ac7..f8d16d8 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -73,6 +73,7 @@ class RemoteKsckCluster;
 
 namespace client {
 
+class KuduColumnarScanBatch;
 class KuduDelete;
 class KuduInsert;
 class KuduInsertIgnore;
@@ -102,6 +103,7 @@ class RemoteTablet;
 class RemoteTabletServer;
 class ReplicaController;
 class RetrieveAuthzTokenRpc;
+class ScanBatchDataInterface;
 class WriteRpc;
 template <class ReqClass, class RespClass>
 class AsyncLeaderMasterRpc; // IWYU pragma: keep
@@ -2255,6 +2257,9 @@ class KUDU_EXPORT KuduScanner {
 
   /// Fetch the next batch of results for this scanner.
   ///
+  /// This variant may not be used when the scan is configured with the
+  /// COLUMNAR_LAYOUT RowFormatFlag.
+  ///
   /// A single KuduScanBatch object may be reused. Each subsequent call
   /// replaces the data from the previous call, and invalidates any
   /// KuduScanBatch::RowPtr objects previously obtained from the batch.
@@ -2263,6 +2268,19 @@ class KUDU_EXPORT KuduScanner {
   /// @return Operation result status.
   Status NextBatch(KuduScanBatch* batch);
 
+  /// Fetch the next batch of columnar results for this scanner.
+  ///
+  /// This variant may only be used when the scan is configured with the
+  /// COLUMNAR_LAYOUT RowFormatFlag.
+  ///
+  /// A single KuduColumnarScanBatch object may be reused. Each subsequent call
+  /// replaces the data from the previous call, and invalidates any
+  /// Slice objects previously obtained from the batch.
+  /// @param [out] batch
+  ///   Placeholder for the result.
+  /// @return Operation result status.
+  Status NextBatch(KuduColumnarScanBatch* batch);
+
   /// Get the KuduTabletServer that is currently handling the scan.
   ///
   /// More concretely, this is the server that handled the most recent
@@ -2389,6 +2407,15 @@ class KUDU_EXPORT KuduScanner {
   ///   results and might even cause the client to crash.
   static const uint64_t PAD_UNIXTIME_MICROS_TO_16_BYTES = 1 << 0;
 
+  /// Enable column-oriented data transfer. The server will transfer data to the client
+  /// in a columnar format rather than a row-wise format. The KuduColumnarScanBatch API
+  /// must be used to fetch results from this scan.
+  ///
+  /// NOTE: older versions of the Kudu server do not support this feature. Clients
+  /// aiming to support compatibility with previous versions should have a fallback
+  /// code path.
+  static const uint64_t COLUMNAR_LAYOUT = 1 << 1;
+
   /// Optionally set row format modifier flags.
   ///
   /// If flags is RowFormatFlags::NO_FLAGS, then no modifications will be made to the row
@@ -2436,6 +2463,8 @@ class KUDU_EXPORT KuduScanner {
  private:
   class KUDU_NO_EXPORT Data;
 
+  Status NextBatch(internal::ScanBatchDataInterface* batch);
+
   friend class KuduScanToken;
   FRIEND_TEST(ClientTest, TestBlockScannerHijackingAttempts);
   FRIEND_TEST(ClientTest, TestScanCloseProxy);
diff --git a/src/kudu/client/columnar_scan_batch.cc b/src/kudu/client/columnar_scan_batch.cc
new file mode 100644
index 0000000..b346445
--- /dev/null
+++ b/src/kudu/client/columnar_scan_batch.cc
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/client/columnar_scan_batch.h"
+
+#include "kudu/client/scanner-internal.h"
+#include "kudu/common/wire_protocol.pb.h"
+#include "kudu/rpc/rpc_controller.h"
+
+namespace kudu {
+class Slice;
+
+namespace client {
+
+KuduColumnarScanBatch::KuduColumnarScanBatch()
+    : data_(new KuduColumnarScanBatch::Data()) {
+}
+
+KuduColumnarScanBatch::~KuduColumnarScanBatch() {
+  delete data_;
+}
+
+int KuduColumnarScanBatch::NumRows() const {
+  return data_->resp_data_.num_rows();
+}
+
+Status KuduColumnarScanBatch::GetDataForColumn(int idx, Slice* data) const {
+  return data_->GetDataForColumn(idx, data);
+}
+
+Status KuduColumnarScanBatch::GetNonNullBitmapForColumn(int idx, Slice* data) const {
+  RETURN_NOT_OK(data_->CheckColumnIndex(idx));
+  const auto& col = data_->resp_data_.columns(idx);
+  if (!col.has_non_null_bitmap_sidecar()) {
+    return Status::NotFound("column is not nullable");
+  }
+  return data_->controller_.GetInboundSidecar(col.non_null_bitmap_sidecar(), data);
+}
+
+} // namespace client
+} // namespace kudu
diff --git a/src/kudu/client/columnar_scan_batch.h b/src/kudu/client/columnar_scan_batch.h
new file mode 100644
index 0000000..23a3e7d
--- /dev/null
+++ b/src/kudu/client/columnar_scan_batch.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_CLIENT_COLUMNAR_SCAN_BATCH_H
+#define KUDU_CLIENT_COLUMNAR_SCAN_BATCH_H
+
+#ifdef KUDU_HEADERS_NO_STUBS
+#include "kudu/gutil/macros.h"
+#else
+#include "kudu/client/stubs.h"
+#endif
+
+#include "kudu/util/kudu_export.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+class Slice;
+
+namespace client {
+
+/// @brief A batch of columnar data returned from a scanner
+///
+/// Similar to KuduScanBatch, this contains a batch of rows returned from a scanner.
+/// This type of batch is used if the KuduScanner::COLUMNAR_LAYOUT row format flag
+/// is enabled.
+///
+/// Retrieving rows in columnar layout can be significantly more efficient. It saves
+/// some CPU cycles on the Kudu cluster and can also enable faster processing of the
+/// returned data in certain client applications.
+///
+/// NOTE: this class is not thread-safe.
+class KUDU_EXPORT KuduColumnarScanBatch {
+ public:
+  KuduColumnarScanBatch();
+  ~KuduColumnarScanBatch();
+
+  /// @return The number of rows in this batch.
+  int NumRows() const;
+
+  /// Get the raw columnar data corresponding to the column with index 'idx'.
+  ///
+  /// The data is in little-endian packed array format. No alignment is guaranteed.
+  /// Space is reserved for all cells regardless of whether they might be null.
+  /// The data stored in a null cell may or may not be zeroed.
+  ///
+  /// If this returns an error for a given column, then a second call for the same
+  /// column has undefined results.
+  ///
+  /// @note The Slice returned is only valid for the lifetime of the KuduColumnarScanBatch.
+  Status GetDataForColumn(int idx, Slice* data) const;
+
+  /// Get a bitmap corresponding to the non-null status of the cells in the given column.
+  ///
+  /// A set bit indicates a non-null cell.
+  /// If the number of rows is not a multiple of 8, the state of the trailing bits in the
+  /// bitmap is undefined.
+  ///
+  /// It is an error to call this function on a column which is not marked as nullable
+  /// in the schema.
+  ///
+  /// @note The Slice returned is only valid for the lifetime of the KuduColumnarScanBatch.
+  Status GetNonNullBitmapForColumn(int idx, Slice* data) const;
+
+ private:
+  class KUDU_NO_EXPORT Data;
+
+  friend class KuduScanner;
+
+  Data* data_;
+  DISALLOW_COPY_AND_ASSIGN(KuduColumnarScanBatch);
+};
+
+
+} // namespace client
+} // namespace kudu
+
+#endif
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 9cb14aa..93006b3 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -38,6 +38,7 @@
 #include "kudu/common/partition.h"
 #include "kudu/common/scan_spec.h"
 #include "kudu/common/schema.h"
+#include "kudu/common/types.h"
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
@@ -49,11 +50,13 @@
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/util/array_view.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/hexdump.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
+#include "kudu/util/safe_math.h"
 
 using google::protobuf::FieldDescriptor;
 using google::protobuf::Reflection;
@@ -70,6 +73,8 @@ using rpc::RpcController;
 using security::SignedTokenPB;
 using strings::Substitute;
 using tserver::NewScanRequestPB;
+using tserver::RowFormatFlags;
+using tserver::ScanResponsePB;
 using tserver::TabletServerFeatures;
 
 namespace client {
@@ -367,6 +372,10 @@ ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline,
   if (configuration().row_format_flags() & KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES) {
     controller_.RequireServerFeature(TabletServerFeatures::PAD_UNIXTIME_MICROS_TO_16_BYTES);
   }
+  if (configuration().row_format_flags() & KuduScanner::COLUMNAR_LAYOUT) {
+    controller_.RequireServerFeature(TabletServerFeatures::COLUMNAR_LAYOUT_FEATURE);
+  }
+
   if (next_req_.has_new_scan_request()) {
     // Only new scan requests require authz tokens. Scan continuations rely on
     // Kudu's prevention of scanner hijacking by different users.
@@ -386,7 +395,9 @@ ScanRpcStatus KuduScanner::Data::SendScanRpc(const MonoTime& overall_deadline,
       rpc_deadline, overall_deadline);
   if (scan_status.result == ScanRpcStatus::OK) {
     UpdateResourceMetrics();
-    num_rows_returned_ += last_response_.data().num_rows();
+    num_rows_returned_ += last_response_.has_data() ? last_response_.data().num_rows() : 0;
+    num_rows_returned_ += last_response_.has_columnar_data() ?
+        last_response_.columnar_data().num_rows() : 0;
   }
   return scan_status;
 }
@@ -561,13 +572,15 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
   partition_pruner_.RemovePartitionKeyRange(remote_->partition().partition_key_end());
 
   next_req_.clear_new_scan_request();
-  data_in_open_ = last_response_.has_data() && last_response_.data().num_rows() > 0;
+  data_in_open_ = (last_response_.has_data() && last_response_.data().num_rows() > 0) ||
+      (last_response_.has_columnar_data() && last_response_.columnar_data().num_rows() > 0);
   if (last_response_.has_more_results()) {
     next_req_.set_scanner_id(last_response_.scanner_id());
     VLOG(2) << "Opened tablet " << remote_->tablet_id()
             << ", scanner ID " << last_response_.scanner_id();
-  } else if (last_response_.has_data()) {
-    VLOG(2) << "Opened tablet " << remote_->tablet_id() << ", no scanner ID assigned";
+  } else if (last_response_.has_data() || last_response_.has_columnar_data()) {
+    VLOG(2) << "Opened tablet " << remote_->tablet_id() << ", no scanner ID assigned, "
+            << " data_in_open=" << data_in_open_;
   } else {
     VLOG(2) << "Opened tablet " << remote_->tablet_id() << " (no rows), no scanner ID assigned";
   }
@@ -670,13 +683,19 @@ Status KuduScanBatch::Data::Reset(RpcController* controller,
                                   const Schema* projection,
                                   const KuduSchema* client_projection,
                                   uint64_t row_format_flags,
-                                  unique_ptr<RowwiseRowBlockPB> resp_data) {
+                                  ScanResponsePB* response) {
   CHECK(controller->finished());
+  if (row_format_flags & RowFormatFlags::COLUMNAR_LAYOUT) {
+    return Status::InvalidArgument("columnar layout specified, must use KuduColumnarScanBatch");
+  }
+
   controller_.Swap(controller);
   projection_ = projection;
   projected_row_size_ = CalculateProjectedRowSize(*projection_);
   client_projection_ = client_projection;
   row_format_flags_ = row_format_flags;
+  unique_ptr<RowwiseRowBlockPB> resp_data(response->release_data());
+
   if (!resp_data) {
     // No new data; just clear out the old stuff.
     resp_data_.Clear();
@@ -748,5 +767,97 @@ void KuduScanBatch::Data::Clear() {
   controller_.Reset();
 }
 
+////////////////////////////////////////////////////////////
+// KuduColumnarScanBatch
+////////////////////////////////////////////////////////////
+
+Status KuduColumnarScanBatch::Data::Reset(
+    rpc::RpcController* controller,
+    const Schema* projection,
+    const KuduSchema* client_projection,
+    uint64_t row_format_flags,
+    tserver::ScanResponsePB* response) {
+  if (!(row_format_flags & RowFormatFlags::COLUMNAR_LAYOUT)) {
+    return Status::InvalidArgument("rowwise layout specified, must use KuduScanBatch");
+  }
+  CHECK(!response->has_data()) << "expected columnar data";
+
+  CHECK(controller->finished());
+  controller_.Swap(controller);
+  projection_ = projection;
+  client_projection_ = client_projection;
+  rewritten_varlen_columns_.clear();
+
+  unique_ptr<ColumnarRowBlockPB> resp_data(response->release_columnar_data());
+  if (!resp_data) {
+    // No new data; just clear out the old stuff.
+    resp_data_.Clear();
+    return Status::OK();
+  }
+  resp_data_ = std::move(*resp_data);
+  return Status::OK();
+}
+
+void KuduColumnarScanBatch::Data::Clear() {
+  resp_data_.Clear();
+  controller_.Reset();
+}
+
+Status KuduColumnarScanBatch::Data::CheckColumnIndex(int idx) const {
+  if (idx >= resp_data_.columns_size()) {
+    return Status::InvalidArgument(Substitute("bad column index $0 ($1 columns present)",
+                                              idx, resp_data_.columns_size()));
+  }
+  return Status::OK();
+}
+
+Status KuduColumnarScanBatch::Data::GetDataForColumn(int idx, Slice* data) const {
+  RETURN_NOT_OK(CheckColumnIndex(idx));
+  RETURN_NOT_OK(controller_.GetInboundSidecar(
+      resp_data_.columns(idx).data_sidecar(),
+      data));
+
+  // Rewrite slices to be real pointers instead of pointers relative to the
+  // indirect data buffer.
+  if (projection_->column(idx).type_info()->physical_type() == BINARY &&
+      !ContainsKey(rewritten_varlen_columns_, idx)) {
+
+    Slice indirect_data_slice;
+    RETURN_NOT_OK(controller_.GetInboundSidecar(
+        resp_data_.columns(idx).indirect_data_sidecar(),
+        &indirect_data_slice));
+
+    ArrayView<Slice> v(reinterpret_cast<Slice*>(data->mutable_data()),
+                       data->size() / sizeof(Slice));
+    for (int row_idx = 0; row_idx < v.size(); row_idx++) {
+      Slice* slice = &v[row_idx];
+      size_t offset_in_indirect = reinterpret_cast<uintptr_t>(slice->data());
+      // Ensure the updated pointer is within the bounds of the indirect data.
+      bool overflowed = false;
+      size_t max_offset = AddWithOverflowCheck(offset_in_indirect, slice->size(), &overflowed);
+      if (PREDICT_FALSE(overflowed || max_offset > indirect_data_slice.size())) {
+        const auto& col = projection_->column(idx);
+        return Status::Corruption(
+            Substitute("Row #$0 contained bad indirect slice for column $1: ($2, $3)",
+                       row_idx, col.ToString(), offset_in_indirect, slice->size()));
+      }
+      *slice = Slice(&indirect_data_slice[offset_in_indirect], slice->size());
+    }
+    rewritten_varlen_columns_.insert(idx);
+  }
+  return Status::OK();
+}
+
+Status KuduColumnarScanBatch::Data::GetNonNullBitmapForColumn(int idx, Slice* data) const {
+  RETURN_NOT_OK(CheckColumnIndex(idx));
+  const auto& col = resp_data_.columns(idx);
+  if (!col.has_non_null_bitmap_sidecar()) {
+    return Status::NotFound("column is not nullable");
+  }
+  return controller_.GetInboundSidecar(col.non_null_bitmap_sidecar(), data);
+}
+
+
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index f0d07a0..01da884 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -23,11 +23,13 @@
 #include <ostream>
 #include <set>
 #include <string>
+#include <unordered_set>
 #include <vector>
 
 #include <glog/logging.h>
 
 #include "kudu/client/client.h"
+#include "kudu/client/columnar_scan_batch.h"
 #include "kudu/client/resource_metrics.h"
 #include "kudu/client/row_result.h"
 #include "kudu/client/scan_batch.h"
@@ -294,7 +296,20 @@ class KuduScanner::Data {
   DISALLOW_COPY_AND_ASSIGN(Data);
 };
 
-class KuduScanBatch::Data {
+namespace internal {
+class ScanBatchDataInterface {
+ public:
+  virtual ~ScanBatchDataInterface() = default;
+  virtual Status Reset(rpc::RpcController* controller,
+                       const Schema* projection,
+                       const KuduSchema* client_projection,
+                       uint64_t row_format_flags,
+                       tserver::ScanResponsePB* response) = 0;
+  virtual void Clear() = 0;
+};
+} // namespace internal
+
+class KuduScanBatch::Data : public internal::ScanBatchDataInterface {
  public:
   Data();
   ~Data();
@@ -303,7 +318,7 @@ class KuduScanBatch::Data {
                const Schema* projection,
                const KuduSchema* client_projection,
                uint64_t row_format_flags,
-               std::unique_ptr<RowwiseRowBlockPB> resp_data);
+               tserver::ScanResponsePB* response) override;
 
   int num_rows() const {
     return resp_data_.num_rows();
@@ -324,7 +339,7 @@ class KuduScanBatch::Data {
 
   void ExtractRows(std::vector<KuduScanBatch::RowPtr>* rows);
 
-  void Clear();
+  void Clear() override;
 
   // Returns the size of a row for the given projection 'proj'.
   static size_t CalculateProjectedRowSize(const Schema& proj);
@@ -354,6 +369,44 @@ class KuduScanBatch::Data {
   size_t projected_row_size_;
 };
 
+class KuduColumnarScanBatch::Data : public internal::ScanBatchDataInterface {
+ public:
+  Status Reset(rpc::RpcController* controller,
+               const Schema* projection,
+               const KuduSchema* client_projection,
+               uint64_t row_format_flags,
+               tserver::ScanResponsePB* response) override;
+  void Clear() override;
+
+  Status GetDataForColumn(int idx, Slice* data) const;
+  Status GetNonNullBitmapForColumn(int idx, Slice* data) const;
+
+ private:
+  Status CheckColumnIndex(int idx) const;
+
+  friend class KuduColumnarScanBatch;
+
+  // The RPC controller for the RPC which returned this batch.
+  // Holding on to the controller ensures we hold on to the
+  // sidecars which contain the actual data.
+  rpc::RpcController controller_;
+
+  // The PB which contains the "direct data" slice.
+  ColumnarRowBlockPB resp_data_;
+
+  // Tracks for each variable-length (binary) column whether the pointers have been
+  // rewritten yet to be "real" pointers instead of sidecar-relative offsets.
+  // Mutable since the 'GetDataForColumn' call is semantically const, but in fact
+  // needs to modify this member to do the lazy pointer rewrites.
+  mutable std::unordered_set<int> rewritten_varlen_columns_;
+
+  // The projection being scanned.
+  const Schema* projection_;
+  // The KuduSchema version of 'projection_'
+  const KuduSchema* client_projection_;
+};
+
+
 } // namespace client
 } // namespace kudu
 
diff --git a/src/kudu/tools/tool_action_remote_replica.cc b/src/kudu/tools/tool_action_remote_replica.cc
index 4a30000..44c2de6 100644
--- a/src/kudu/tools/tool_action_remote_replica.cc
+++ b/src/kudu/tools/tool_action_remote_replica.cc
@@ -155,7 +155,7 @@ class ReplicaDumper {
                                   &schema,
                                   &client_schema,
                                   client::KuduScanner::NO_FLAGS,
-                                  unique_ptr<RowwiseRowBlockPB>(resp.release_data())));
+                                  &resp));
       vector<KuduRowResult> rows;
       results.ExtractRows(&rows);
       for (const auto& r : rows) {


[kudu] 01/02: [ranger] authorize list tables should never throw NotAuthorized

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit abd29cc98af31d57251f19de1b68037afce2aff1
Author: Hao Hao <ha...@cloudera.com>
AuthorDate: Mon Mar 30 17:49:32 2020 -0700

    [ranger] authorize list tables should never throw NotAuthorized
    
    When authorizing list tables, the tables the user has no privileges
    should be filtered out. However, that should not report an NotAuthorized
    error even the user has no privileges over any tables. This also matches
    the behavior with Sentry authorization.
    
    Change-Id: I2d1df45a82acc70000783f0cbcce4d3c81840176
    Reviewed-on: http://gerrit.cloudera.org:8080/15609
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/master/ranger_authz_provider.cc |  6 ++++++
 src/kudu/ranger/ranger_client-test.cc    | 13 ++++++-------
 src/kudu/ranger/ranger_client.cc         | 10 ----------
 src/kudu/ranger/ranger_client.h          |  6 ++----
 4 files changed, 14 insertions(+), 21 deletions(-)

diff --git a/src/kudu/master/ranger_authz_provider.cc b/src/kudu/master/ranger_authz_provider.cc
index 43ecfe5..8cbb49f 100644
--- a/src/kudu/master/ranger_authz_provider.cc
+++ b/src/kudu/master/ranger_authz_provider.cc
@@ -111,6 +111,12 @@ Status RangerAuthzProvider::AuthorizeListTables(const string& user,
   }
 
   *checked_table_names = true;
+
+  // Return immediately if there is no tables to authorize against.
+  if (table_names->empty()) {
+    return Status::OK();
+  }
+
   // List tables requires 'METADATA ON TABLE' privilege on all tables being listed.
   return client_.AuthorizeActionMultipleTables(user, ActionPB::METADATA, table_names);
 }
diff --git a/src/kudu/ranger/ranger_client-test.cc b/src/kudu/ranger/ranger_client-test.cc
index b15be50..017423e 100644
--- a/src/kudu/ranger/ranger_client-test.cc
+++ b/src/kudu/ranger/ranger_client-test.cc
@@ -170,9 +170,8 @@ TEST_F(RangerClientTest, TestAuthorizeListNoTablesAuthorized) {
   unordered_set<string> tables;
   tables.emplace("foo.bar");
   tables.emplace("foo.baz");
-  auto s = client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables);
-  ASSERT_TRUE(s.IsNotAuthorized());
-  ASSERT_EQ(2, tables.size());
+  ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables));
+  ASSERT_EQ(0, tables.size());
 }
 
 TEST_F(RangerClientTest, TestAuthorizeMetadataSubsetOfTablesAuthorized) {
@@ -201,8 +200,8 @@ TEST_F(RangerClientTest, TestAuthorizeMetadataAllNonRanger) {
   unordered_set<string> tables;
   tables.emplace("foo.");
   tables.emplace(".bar");
-  auto s = client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables);
-  ASSERT_TRUE(s.IsNotAuthorized());
+  ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables));
+  ASSERT_EQ(0, tables.size());
 }
 
 TEST_F(RangerClientTest, TestAuthorizeMetadataNoneAuthorizedContainsNonRanger) {
@@ -211,8 +210,8 @@ TEST_F(RangerClientTest, TestAuthorizeMetadataNoneAuthorizedContainsNonRanger) {
   tables.emplace(".bar");
   tables.emplace("foo.bar");
   tables.emplace("foo.baz");
-  auto s = client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables);
-  ASSERT_TRUE(s.IsNotAuthorized());
+  ASSERT_OK(client_.AuthorizeActionMultipleTables("jdoe", ActionPB::METADATA, &tables));
+  ASSERT_EQ(0, tables.size());
 }
 
 TEST_F(RangerClientTest, TestAuthorizeMetadataAllAuthorizedContainsNonRanger) {
diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index 53371ee..2f0ba21 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -352,10 +352,6 @@ Status RangerClient::AuthorizeActionMultipleTables(const string& user_name,
                                                    const ActionPB& action,
                                                    unordered_set<string>* table_names) {
   DCHECK(subprocess_);
-  // Return immediately if there is no tables to authorize against.
-  if (table_names->empty()) {
-    return Status::OK();
-  }
 
   RangerRequestListPB req_list;
   RangerResponseListPB resp_list;
@@ -391,12 +387,6 @@ Status RangerClient::AuthorizeActionMultipleTables(const string& user_name,
     }
   }
 
-  if (allowed_tables.empty()) {
-    LOG(WARNING) << Substitute("User $0 is not authorized to perform $1 on $2 tables",
-                               user_name, ActionPB_Name(action), table_names->size());
-    return Status::NotAuthorized(kUnauthorizedAction);
-  }
-
   *table_names = move(allowed_tables);
 
   return Status::OK();
diff --git a/src/kudu/ranger/ranger_client.h b/src/kudu/ranger/ranger_client.h
index 1117785..4578f6c 100644
--- a/src/kudu/ranger/ranger_client.h
+++ b/src/kudu/ranger/ranger_client.h
@@ -77,10 +77,8 @@ class RangerClient {
                          const std::string& table_name, Scope scope = Scope::TABLE)
       WARN_UNUSED_RESULT;
 
-  // Authorizes action on multiple tables. If there is at least one table that
-  // user is authorized to perform the action on, it sets 'table_names' to the
-  // tables the user is authorized to access and returns OK, NotAuthorized
-  // otherwise.
+  // Authorizes action on multiple tables. It sets 'table_names' to the
+  // tables the user is authorized to access and returns OK.
   Status AuthorizeActionMultipleTables(const std::string& user_name, const ActionPB& action,
                                        std::unordered_set<std::string>* table_names)
       WARN_UNUSED_RESULT;