You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2020/04/07 22:21:14 UTC

[kudu] branch master updated (80fb9a8 -> 36a21d6)

This is an automated email from the ASF dual-hosted git repository.

todd pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 80fb9a8  [python] KUDU-2632 Add DATE type support
     new ffa3347  util: remove duplicate results from DNS resolution
     new cd65d50  [ranger] update description of Ranger integration related flags
     new 96259c8  allow skip block manager in some ops of local_replica tools
     new 36a21d6  wire_protocol: change columnar serialization of varlen data to match Arrow

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/kudu/client/client-test.cc              |  18 ++--
 src/kudu/client/columnar_scan_batch.cc      |   8 +-
 src/kudu/client/columnar_scan_batch.h       |  25 ++++-
 src/kudu/client/scanner-internal.cc         |  99 +++++++++++++-------
 src/kudu/client/scanner-internal.h          |  10 +-
 src/kudu/common/columnar_serialization.cc   | 136 ++++++++++++++++++++--------
 src/kudu/common/columnar_serialization.h    |   4 +-
 src/kudu/common/rowblock.h                  |  15 +++
 src/kudu/common/wire_protocol-test.cc       |  24 ++---
 src/kudu/common/wire_protocol.proto         |   6 +-
 src/kudu/ranger/ranger_client.cc            |  27 +++---
 src/kudu/tools/tool_action_local_replica.cc |  23 +++--
 src/kudu/tserver/tablet_server-test.cc      |  16 ++--
 src/kudu/tserver/tablet_service.cc          |  10 +-
 src/kudu/util/net/net_util.cc               |   9 +-
 15 files changed, 289 insertions(+), 141 deletions(-)


[kudu] 03/04: allow skip block manager in some ops of local_replica tools

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 96259c889a73516c021adf1051627c2946be05bb
Author: wangning <19...@gmail.com>
AuthorDate: Mon Apr 6 13:24:12 2020 +0800

    allow skip block manager in some ops of local_replica tools
    
    These cli ops could be speed up via skip opening block manager.
    
    - local_replica cmeta print_replica_uuids
    - local_replica cmeta rewrite_raft_config
    - local_replica cmeta set_term
    - local_replica dump wals
    - local_replica list
    
    Change-Id: I01e22354fdb76596008cd0824e240d24a8f20099
    Reviewed-on: http://gerrit.cloudera.org:8080/15656
    Tested-by: Andrew Wong <aw...@cloudera.com>
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/tools/tool_action_local_replica.cc | 23 +++++++++++++----------
 1 file changed, 13 insertions(+), 10 deletions(-)

diff --git a/src/kudu/tools/tool_action_local_replica.cc b/src/kudu/tools/tool_action_local_replica.cc
index 12b197a..67a2d1a 100644
--- a/src/kudu/tools/tool_action_local_replica.cc
+++ b/src/kudu/tools/tool_action_local_replica.cc
@@ -155,9 +155,10 @@ string Indent(int indent) {
   return string(indent, ' ');
 }
 
-Status FsInit(unique_ptr<FsManager>* fs_manager) {
+Status FsInit(bool skip_block_manager, unique_ptr<FsManager>* fs_manager) {
   FsManagerOpts fs_opts;
   fs_opts.read_only = true;
+  fs_opts.skip_block_manager = skip_block_manager;
   fs_opts.update_instances = fs::UpdateInstanceBehavior::DONT_UPDATE;
   unique_ptr<FsManager> fs_ptr(new FsManager(Env::Default(), fs_opts));
   RETURN_NOT_OK(fs_ptr->Open());
@@ -238,7 +239,7 @@ Status ParsePeerString(const string& peer_str,
 
 Status PrintReplicaUuids(const RunnerContext& context) {
   unique_ptr<FsManager> fs_manager;
-  RETURN_NOT_OK(FsInit(&fs_manager));
+  RETURN_NOT_OK(FsInit(/*skip_block_manager*/true, &fs_manager));
   scoped_refptr<ConsensusMetadataManager> cmeta_manager(
       new ConsensusMetadataManager(fs_manager.get()));
 
@@ -323,7 +324,9 @@ Status SetRaftTerm(const RunnerContext& context) {
 
   // Load the current metadata from disk and verify that the intended operation is safe.
   Env* env = Env::Default();
-  FsManager fs_manager(env, FsManagerOpts());
+  FsManagerOpts fs_opts = FsManagerOpts();
+  fs_opts.skip_block_manager = true;
+  FsManager fs_manager(env, fs_opts);
   RETURN_NOT_OK(fs_manager.Open());
   // Load the cmeta file and rewrite the raft config.
   scoped_refptr<ConsensusMetadataManager> cmeta_manager(new ConsensusMetadataManager(&fs_manager));
@@ -468,7 +471,7 @@ struct TabletSizeStats {
 Status SummarizeDataSize(const RunnerContext& context) {
   const string& tablet_id_pattern = FindOrDie(context.required_args, kTabletIdGlobArg);
   unique_ptr<FsManager> fs;
-  RETURN_NOT_OK(FsInit(&fs));
+  RETURN_NOT_OK(FsInit(/*skip_block_manager*/false, &fs));
 
   vector<string> tablets;
   RETURN_NOT_OK(fs->ListTabletIds(&tablets));
@@ -525,7 +528,7 @@ Status SummarizeDataSize(const RunnerContext& context) {
 
 Status DumpWals(const RunnerContext& context) {
   unique_ptr<FsManager> fs_manager;
-  RETURN_NOT_OK(FsInit(&fs_manager));
+  RETURN_NOT_OK(FsInit(/*skip_block_manager*/true, &fs_manager));
   const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
 
   shared_ptr<LogReader> reader;
@@ -576,7 +579,7 @@ Status ListBlocksInRowSet(const Schema& schema,
 
 Status DumpBlockIdsForLocalReplica(const RunnerContext& context) {
   unique_ptr<FsManager> fs_manager;
-  RETURN_NOT_OK(FsInit(&fs_manager));
+  RETURN_NOT_OK(FsInit(/*skip_block_manager*/false, &fs_manager));
   const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
 
   scoped_refptr<TabletMetadata> meta;
@@ -628,7 +631,7 @@ Status DumpTabletMeta(FsManager* fs_manager,
 
 Status ListLocalReplicas(const RunnerContext& context) {
   unique_ptr<FsManager> fs_manager;
-  RETURN_NOT_OK(FsInit(&fs_manager));
+  RETURN_NOT_OK(FsInit(/*skip_block_manager*/true, &fs_manager));
 
   vector<string> tablets;
   RETURN_NOT_OK(fs_manager->ListTabletIds(&tablets));
@@ -702,7 +705,7 @@ Status DumpRowSetInternal(const IOContext& ctx,
 Status DumpRowSet(const RunnerContext& context) {
   const int kIndent = 2;
   unique_ptr<FsManager> fs_manager;
-  RETURN_NOT_OK(FsInit(&fs_manager));
+  RETURN_NOT_OK(FsInit(/*skip_block_manager*/false, &fs_manager));
   const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
 
   scoped_refptr<TabletMetadata> meta;
@@ -740,14 +743,14 @@ Status DumpRowSet(const RunnerContext& context) {
 
 Status DumpMeta(const RunnerContext& context) {
   unique_ptr<FsManager> fs_manager;
-  RETURN_NOT_OK(FsInit(&fs_manager));
+  RETURN_NOT_OK(FsInit(/*skip_block_manager*/false, &fs_manager));
   const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
   return DumpTabletMeta(fs_manager.get(), tablet_id, 0);
 }
 
 Status DumpDataDirs(const RunnerContext& context) {
   unique_ptr<FsManager> fs_manager;
-  RETURN_NOT_OK(FsInit(&fs_manager));
+  RETURN_NOT_OK(FsInit(/*skip_block_manager*/false, &fs_manager));
   const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
   // Load the tablet meta to make sure the tablet's data directories are loaded
   // into the manager.


[kudu] 01/04: util: remove duplicate results from DNS resolution

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit ffa33473bb0c459ec35cbc68e1ea578938b0bd6b
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Sat Apr 4 20:34:00 2020 -0700

    util: remove duplicate results from DNS resolution
    
    On some systems it seems that our DNS resolution code can end up
    yielding multiple copies of the same address. That would produce
    annoying log messages like:
    
      $ kudu table list localhost
      W0404 20:35:05.511526 31378 client-internal.cc:597] Specified master
      server address 'localhost' resolved to multiple IPs. Using
      127.0.0.1:7051
    
    This patch ensures that any given address is only appended to the result
    vector once.
    
    Change-Id: I7d9b9f9839a899d8022f5ac6496555ff84583192
    Reviewed-on: http://gerrit.cloudera.org:8080/15665
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/util/net/net_util.cc | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/src/kudu/util/net/net_util.cc b/src/kudu/util/net/net_util.cc
index 9bdf3c6..f3a78b9 100644
--- a/src/kudu/util/net/net_util.cc
+++ b/src/kudu/util/net/net_util.cc
@@ -196,6 +196,11 @@ Status HostPort::ResolveAddresses(vector<Sockaddr>* addresses) const {
   LOG_SLOW_EXECUTION(WARNING, 200, op_description) {
     RETURN_NOT_OK(GetAddrInfo(host_, hints, op_description, &result));
   }
+
+  // DNS may return the same host multiple times. We want to return only the unique
+  // addresses, but in the same order as DNS returned them. To do so, we keep track
+  // of the already-inserted elements in a set.
+  unordered_set<Sockaddr> inserted;
   vector<Sockaddr> result_addresses;
   for (const addrinfo* ai = result.get(); ai != nullptr; ai = ai->ai_next) {
     CHECK_EQ(AF_INET, ai->ai_family);
@@ -204,7 +209,9 @@ Status HostPort::ResolveAddresses(vector<Sockaddr>* addresses) const {
     Sockaddr sockaddr(*addr);
     VLOG(2) << Substitute("resolved address $0 for host/port $1",
                           sockaddr.ToString(), ToString());
-    result_addresses.emplace_back(sockaddr);
+    if (InsertIfNotPresent(&inserted, sockaddr)) {
+      result_addresses.emplace_back(sockaddr);
+    }
   }
   if (PREDICT_FALSE(FLAGS_fail_dns_resolution)) {
     return Status::NetworkError("injected DNS resolution failure");


[kudu] 04/04: wire_protocol: change columnar serialization of varlen data to match Arrow

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 36a21d6ee4de8828417d7884cf87cd5e2ba15a21
Author: Todd Lipcon <to...@apache.org>
AuthorDate: Mon Apr 6 10:29:39 2020 -0700

    wire_protocol: change columnar serialization of varlen data to match Arrow
    
    This changes the format of variable-length columns serialized on the
    wire to match Apache Arrow instead of our internal column format. The
    Arrow format consists of an array of n+1 offsets for n rows, such that
    the data for cell 'n' spans offset offsets[n]...offsets[n+1].
    
    The obvious advantage here is that clients can zero-copy into Arrow
    structures since the format is compatible. The less obvious advantage is
    that we are going from 16 bytes (sizeof(Slice)) to 4 bytes
    (sizeof(uint32_t offset)) for each serialized string, so this should be
    a savings even for non-arrow users of the API.
    
    This patch also adds some more sanity checking of the wire format in the
    client API so that it's not the responsibility of the caller to guard
    against malicious servers.
    
    Change-Id: Iadf728744feb83f5980e62bea4fd7634a1a52467
    Reviewed-on: http://gerrit.cloudera.org:8080/15660
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/client-test.cc            |  18 ++--
 src/kudu/client/columnar_scan_batch.cc    |   8 +-
 src/kudu/client/columnar_scan_batch.h     |  25 ++++--
 src/kudu/client/scanner-internal.cc       |  99 +++++++++++++++-------
 src/kudu/client/scanner-internal.h        |  10 +--
 src/kudu/common/columnar_serialization.cc | 136 ++++++++++++++++++++++--------
 src/kudu/common/columnar_serialization.h  |   4 +-
 src/kudu/common/rowblock.h                |  15 ++++
 src/kudu/common/wire_protocol-test.cc     |  24 +++---
 src/kudu/common/wire_protocol.proto       |   6 +-
 src/kudu/tserver/tablet_server-test.cc    |  16 ++--
 src/kudu/tserver/tablet_service.cc        |  10 +--
 12 files changed, 253 insertions(+), 118 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index d500d6e..6961ff3 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -1065,15 +1065,18 @@ TEST_F(ClientTest, TestColumnarScan) {
 
     // Verify the data.
     Slice col_data[4];
-    for (int i = 0; i < 4; i++) {
-      ASSERT_OK(batch.GetDataForColumn(i, &col_data[i]));
-    }
+    Slice string_indir_data;
+    ASSERT_OK(batch.GetFixedLengthColumn(0, &col_data[0]));
+    ASSERT_OK(batch.GetFixedLengthColumn(1, &col_data[1]));
+    ASSERT_OK(batch.GetVariableLengthColumn(2, &col_data[2], &string_indir_data));
+    ASSERT_OK(batch.GetFixedLengthColumn(3, &col_data[3]));
+
     ArrayView<const int32_t> c0(reinterpret_cast<const int32_t*>(col_data[0].data()),
                                 batch.NumRows());
     ArrayView<const int32_t> c1(reinterpret_cast<const int32_t*>(col_data[1].data()),
                                 batch.NumRows());
-    ArrayView<const Slice> c2(reinterpret_cast<const Slice*>(col_data[2].data()),
-                              batch.NumRows());
+    ArrayView<const uint32_t> c2_offsets(reinterpret_cast<const uint32_t*>(col_data[2].data()),
+                                         batch.NumRows() + 1);
     ArrayView<const int32_t> c3(reinterpret_cast<const int32_t*>(col_data[3].data()),
                                 batch.NumRows());
 
@@ -1081,7 +1084,10 @@ TEST_F(ClientTest, TestColumnarScan) {
       int row_idx = total_rows + i;
       EXPECT_EQ(row_idx, c0[i]);
       EXPECT_EQ(row_idx * 2, c1[i]);
-      EXPECT_EQ(Substitute("hello $0", row_idx), c2[i]);
+
+      Slice str(&string_indir_data[c2_offsets[i]],
+                c2_offsets[i + 1] - c2_offsets[i]);
+      EXPECT_EQ(Substitute("hello $0", row_idx), str);
       EXPECT_EQ(row_idx * 3, c3[i]);
     }
     total_rows += batch.NumRows();
diff --git a/src/kudu/client/columnar_scan_batch.cc b/src/kudu/client/columnar_scan_batch.cc
index b346445..c764665 100644
--- a/src/kudu/client/columnar_scan_batch.cc
+++ b/src/kudu/client/columnar_scan_batch.cc
@@ -38,8 +38,12 @@ int KuduColumnarScanBatch::NumRows() const {
   return data_->resp_data_.num_rows();
 }
 
-Status KuduColumnarScanBatch::GetDataForColumn(int idx, Slice* data) const {
-  return data_->GetDataForColumn(idx, data);
+Status KuduColumnarScanBatch::GetFixedLengthColumn(int idx, Slice* data) const {
+  return data_->GetFixedLengthColumn(idx, data);
+}
+
+Status KuduColumnarScanBatch::GetVariableLengthColumn(int idx, Slice* offsets, Slice* data) const {
+  return data_->GetVariableLengthColumn(idx, offsets, data);
 }
 
 Status KuduColumnarScanBatch::GetNonNullBitmapForColumn(int idx, Slice* data) const {
diff --git a/src/kudu/client/columnar_scan_batch.h b/src/kudu/client/columnar_scan_batch.h
index 23a3e7d..924c162 100644
--- a/src/kudu/client/columnar_scan_batch.h
+++ b/src/kudu/client/columnar_scan_batch.h
@@ -41,6 +41,12 @@ namespace client {
 /// some CPU cycles on the Kudu cluster and can also enable faster processing of the
 /// returned data in certain client applications.
 ///
+/// The columnar data retrieved by this class matches the columnar encoding described by
+/// Apache Arrow[1], but without the alignment and padding guarantees that are made by
+/// the Arrow IPC serialization.
+///
+/// [1] https://arrow.apache.org/docs/format/Columnar.html
+///
 /// NOTE: this class is not thread-safe.
 class KUDU_EXPORT KuduColumnarScanBatch {
  public:
@@ -50,17 +56,26 @@ class KUDU_EXPORT KuduColumnarScanBatch {
   /// @return The number of rows in this batch.
   int NumRows() const;
 
-  /// Get the raw columnar data corresponding to the column with index 'idx'.
+  /// Get the raw columnar data corresponding to the primitive-typed column with index 'idx'.
   ///
-  /// The data is in little-endian packed array format. No alignment is guaranteed.
+  /// The data is in little-endian packed array format. No alignment or padding is guaranteed.
   /// Space is reserved for all cells regardless of whether they might be null.
   /// The data stored in a null cell may or may not be zeroed.
   ///
-  /// If this returns an error for a given column, then a second call for the same
-  /// column has undefined results.
+  /// For variable-length (e.g. STRING, BINARY, VARCHAR) columns, use
+  /// GetVariableLengthColumn instead.
   ///
   /// @note The Slice returned is only valid for the lifetime of the KuduColumnarScanBatch.
-  Status GetDataForColumn(int idx, Slice* data) const;
+  Status GetFixedLengthColumn(int idx, Slice* data) const;
+
+  /// Return the variable-length data for the variable-length-typed column with index 'idx'.
+  ///
+  /// If NumRows() is 0, the 'offsets' array will have length 0. Otherwise, this array
+  /// will contain NumRows() + 1 entries, each indicating an offset within the
+  /// variable-length data array returned in 'data'. For each cell with index 'n',
+  /// offsets[n] indicates the starting offset of that cell, and offsets[n+1] indicates
+  /// the ending offset of that cell.
+  Status GetVariableLengthColumn(int idx, Slice* offsets, Slice* data) const;
 
   /// Get a bitmap corresponding to the non-null status of the cells in the given column.
   ///
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 93006b3..75bcbd3 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -50,13 +50,11 @@
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/security/token.pb.h"
 #include "kudu/tserver/tserver_service.proxy.h"
-#include "kudu/util/array_view.h"
 #include "kudu/util/async_util.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/hexdump.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/monotime.h"
-#include "kudu/util/safe_math.h"
 
 using google::protobuf::FieldDescriptor;
 using google::protobuf::Reflection;
@@ -786,7 +784,6 @@ Status KuduColumnarScanBatch::Data::Reset(
   controller_.Swap(controller);
   projection_ = projection;
   client_projection_ = client_projection;
-  rewritten_varlen_columns_.clear();
 
   unique_ptr<ColumnarRowBlockPB> resp_data(response->release_columnar_data());
   if (!resp_data) {
@@ -811,48 +808,86 @@ Status KuduColumnarScanBatch::Data::CheckColumnIndex(int idx) const {
   return Status::OK();
 }
 
-Status KuduColumnarScanBatch::Data::GetDataForColumn(int idx, Slice* data) const {
+Status KuduColumnarScanBatch::Data::GetFixedLengthColumn(int idx, Slice* data) const {
   RETURN_NOT_OK(CheckColumnIndex(idx));
+  const auto& col = projection_->column(idx);
+  if (PREDICT_FALSE(col.type_info()->physical_type() == BINARY)) {
+    return Status::InvalidArgument("column is variable-length", col.ToString());
+  }
+
+  // Get the sidecar from the RPC.
+  if (PREDICT_FALSE(!resp_data_.columns(idx).has_data_sidecar())) {
+    return Status::Corruption("server did not send data for column", col.ToString());
+  }
   RETURN_NOT_OK(controller_.GetInboundSidecar(
       resp_data_.columns(idx).data_sidecar(),
       data));
 
-  // Rewrite slices to be real pointers instead of pointers relative to the
-  // indirect data buffer.
-  if (projection_->column(idx).type_info()->physical_type() == BINARY &&
-      !ContainsKey(rewritten_varlen_columns_, idx)) {
-
-    Slice indirect_data_slice;
-    RETURN_NOT_OK(controller_.GetInboundSidecar(
-        resp_data_.columns(idx).indirect_data_sidecar(),
-        &indirect_data_slice));
-
-    ArrayView<Slice> v(reinterpret_cast<Slice*>(data->mutable_data()),
-                       data->size() / sizeof(Slice));
-    for (int row_idx = 0; row_idx < v.size(); row_idx++) {
-      Slice* slice = &v[row_idx];
-      size_t offset_in_indirect = reinterpret_cast<uintptr_t>(slice->data());
-      // Ensure the updated pointer is within the bounds of the indirect data.
-      bool overflowed = false;
-      size_t max_offset = AddWithOverflowCheck(offset_in_indirect, slice->size(), &overflowed);
-      if (PREDICT_FALSE(overflowed || max_offset > indirect_data_slice.size())) {
-        const auto& col = projection_->column(idx);
-        return Status::Corruption(
-            Substitute("Row #$0 contained bad indirect slice for column $1: ($2, $3)",
-                       row_idx, col.ToString(), offset_in_indirect, slice->size()));
-      }
-      *slice = Slice(&indirect_data_slice[offset_in_indirect], slice->size());
+  size_t expected_size = resp_data_.num_rows() * col.type_info()->size();
+  if (PREDICT_FALSE(data->size() != expected_size)) {
+    return Status::Corruption(Substitute(
+        "server sent unexpected data length $0 for column $1 (expected $2)",
+        data->size(), col.ToString(), expected_size));
+  }
+  return Status::OK();
+}
+
+Status KuduColumnarScanBatch::Data::GetVariableLengthColumn(
+    int idx, Slice* offsets, Slice* data) const {
+  RETURN_NOT_OK(CheckColumnIndex(idx));
+  const auto& col = projection_->column(idx);
+  if (PREDICT_FALSE(col.type_info()->physical_type() != BINARY)) {
+    return Status::InvalidArgument("column is not variable-length", col.ToString());
+  }
+  const auto& resp_col = resp_data_.columns(idx);
+
+  // Get the offsets.
+  Slice offsets_tmp;
+  if (PREDICT_FALSE(!resp_col.has_data_sidecar())) {
+    return Status::Corruption("server did not send offset data for column", col.ToString());
+  }
+  RETURN_NOT_OK(controller_.GetInboundSidecar(
+      resp_col.data_sidecar(),
+      &offsets_tmp));
+
+  // Get the varlen data.
+  Slice data_tmp;
+  if (PREDICT_FALSE(!resp_col.has_varlen_data_sidecar())) {
+    return Status::Corruption("server did not send varlen data for column", col.ToString());
+  }
+  RETURN_NOT_OK(controller_.GetInboundSidecar(
+      resp_col.varlen_data_sidecar(),
+      &data_tmp));
+
+  // Validate the offsets.
+  auto expected_num_offsets = resp_data_.num_rows() == 0 ? 0 : (resp_data_.num_rows() + 1);
+  auto expected_size = expected_num_offsets * sizeof(uint32_t);
+  if (PREDICT_FALSE(offsets_tmp.size() != expected_size)) {
+    return Status::Corruption(Substitute("size $0 of offsets buffer for column $1 did not "
+                                         "match expected size $2",
+                                         offsets_tmp.size(), col.ToString(), expected_size));
+  }
+  for (int i = 0; i < resp_data_.num_rows(); i++) {
+    uint32_t offset = UnalignedLoad<uint32_t>(offsets_tmp.data() + i * sizeof(uint32_t));
+    if (PREDICT_FALSE(offset > data_tmp.size())) {
+      return Status::Corruption(Substitute(
+          "invalid offset $0 returned for column $1 at index $2 (max valid offset is $3)",
+          offset, col.ToString(), i, data_tmp.size()));
     }
-    rewritten_varlen_columns_.insert(idx);
   }
+
+  *offsets = offsets_tmp;
+  *data = data_tmp;
+
   return Status::OK();
 }
 
 Status KuduColumnarScanBatch::Data::GetNonNullBitmapForColumn(int idx, Slice* data) const {
   RETURN_NOT_OK(CheckColumnIndex(idx));
   const auto& col = resp_data_.columns(idx);
-  if (!col.has_non_null_bitmap_sidecar()) {
-    return Status::NotFound("column is not nullable");
+  if (PREDICT_FALSE(!col.has_non_null_bitmap_sidecar())) {
+    return Status::Corruption(Substitute("server did not send null bitmap for column $0",
+                                         projection_->column(idx).ToString()));
   }
   return controller_.GetInboundSidecar(col.non_null_bitmap_sidecar(), data);
 }
diff --git a/src/kudu/client/scanner-internal.h b/src/kudu/client/scanner-internal.h
index 01da884..98e89ea 100644
--- a/src/kudu/client/scanner-internal.h
+++ b/src/kudu/client/scanner-internal.h
@@ -23,7 +23,6 @@
 #include <ostream>
 #include <set>
 #include <string>
-#include <unordered_set>
 #include <vector>
 
 #include <glog/logging.h>
@@ -378,7 +377,8 @@ class KuduColumnarScanBatch::Data : public internal::ScanBatchDataInterface {
                tserver::ScanResponsePB* response) override;
   void Clear() override;
 
-  Status GetDataForColumn(int idx, Slice* data) const;
+  Status GetFixedLengthColumn(int idx, Slice* data) const;
+  Status GetVariableLengthColumn(int idx, Slice* offsets, Slice* data) const;
   Status GetNonNullBitmapForColumn(int idx, Slice* data) const;
 
  private:
@@ -394,12 +394,6 @@ class KuduColumnarScanBatch::Data : public internal::ScanBatchDataInterface {
   // The PB which contains the "direct data" slice.
   ColumnarRowBlockPB resp_data_;
 
-  // Tracks for each variable-length (binary) column whether the pointers have been
-  // rewritten yet to be "real" pointers instead of sidecar-relative offsets.
-  // Mutable since the 'GetDataForColumn' call is semantically const, but in fact
-  // needs to modify this member to do the lazy pointer rewrites.
-  mutable std::unordered_set<int> rewritten_varlen_columns_;
-
   // The projection being scanned.
   const Schema* projection_;
   // The KuduSchema version of 'projection_'
diff --git a/src/kudu/common/columnar_serialization.cc b/src/kudu/common/columnar_serialization.cc
index 47060de..eaa8469 100644
--- a/src/kudu/common/columnar_serialization.cc
+++ b/src/kudu/common/columnar_serialization.cc
@@ -35,6 +35,7 @@
 #include "kudu/common/zp7.h"
 #include "kudu/gutil/cpu.h"
 #include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/fastmem.h"
 #include "kudu/util/alignment.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/slice.h"
@@ -449,32 +450,6 @@ void CopySelectedRows(const vector<uint16_t>& sel_rows,
 }
 
 namespace {
-// For each of the Slices in 'cells_buf', copy the pointed-to data into 'indirect' and
-// modify the Slice so that its 'pointer' field is not an actual memory pointer, but
-// rather an offset within the indirect data buffer.
-void RelocateSlicesToIndirect(uint8_t* __restrict__ cells_buf, int n_rows,
-                              faststring* indirect) {
-  Slice* cell_slices = reinterpret_cast<Slice*>(cells_buf);
-  size_t total_size = 0;
-  for (int i = 0; i < n_rows; i++) {
-    total_size += cell_slices[i].size();
-  }
-
-  int old_size = indirect->size();
-  indirect->resize_with_extra_capacity(old_size + total_size);
-
-  uint8_t* dst_base = indirect->data();
-  uint8_t* dst = dst_base + old_size;
-
-  for (int i = 0; i < n_rows; i++) {
-    Slice* s = &cell_slices[i];
-    if (!s->empty()) {
-      memcpy(dst, s->data(), s->size());
-    }
-    *s = Slice(reinterpret_cast<const uint8_t*>(dst - dst_base), s->size());
-    dst += s->size();
-  }
-}
 
 // Specialized division for the known type sizes. Despite having some branching here,
 // this is faster than a 'div' instruction which has a 20+ cycle latency.
@@ -489,12 +464,12 @@ size_t div_sizeof_type(size_t s, size_t divisor) {
   }
 }
 
-
-// Copy the selected cells (and non-null-bitmap bits) from 'cblock' into 'dst' according to
-// the given 'sel_rows'.
+// Copy the selected primitive cells (and non-null-bitmap bits) from 'cblock' into 'dst'
+// according to the given 'sel_rows'.
 void CopySelectedCellsFromColumn(const ColumnBlock& cblock,
                                  const SelectedRows& sel_rows,
                                  ColumnarSerializedBatch::Column* dst) {
+  DCHECK(cblock.type_info()->physical_type() != BINARY);
   size_t sizeof_type = cblock.type_info()->size();
   int n_sel = sel_rows.num_selected();
 
@@ -523,11 +498,89 @@ void CopySelectedCellsFromColumn(const ColumnBlock& cblock,
     ZeroNullValues(sizeof_type, initial_rows, n_sel,
         dst->data.data(), dst->non_null_bitmap->data());
   }
+}
+
+
+// For each of the Slices in 'cells_buf', copy the pointed-to data into 'varlen' and
+// write the _end_ offset of the copied data into 'offsets_out'. This assumes (and
+// DCHECKs) that the _start_ offset of each cell was already previously written by a
+// previous invocation of this function.
+void CopySlicesAndWriteEndOffsets(const Slice* __restrict__ cells_buf,
+                                  const SelectedRows& sel_rows,
+                                  uint32_t* __restrict__ offsets_out,
+                                  faststring* varlen) {
+  const Slice* cell_slices = reinterpret_cast<const Slice*>(cells_buf);
+  size_t total_added_size = 0;
+  sel_rows.ForEachIndex(
+      [&](uint16_t i) {
+        total_added_size += cell_slices[i].size();
+      });
+
+  // The output array should already have an entry for the start offset
+  // of our first cell.
+  DCHECK_EQ(offsets_out[-1], varlen->size());
+
+  int old_size = varlen->size();
+  varlen->resize_with_extra_capacity(old_size + total_added_size);
+
+  uint8_t* dst_base = varlen->data();
+  uint8_t* dst = dst_base + old_size;
+
+  sel_rows.ForEachIndex(
+      [&](uint16_t i) {
+        const Slice* s = &cell_slices[i];
+        if (!s->empty()) {
+          strings::memcpy_inlined(dst, s->data(), s->size());
+        }
+        dst += s->size();
+        *offsets_out++ = dst - dst_base;
+      });
+}
+
+// Copy variable-length cells into 'dst' using an Arrow-style serialization:
+// a list of offsets in the 'data' array and the data itself in the 'varlen_data'
+// array.
+//
+// The offset array has a length one greater than the number of cells.
+void CopySelectedVarlenCellsFromColumn(const ColumnBlock& cblock,
+                                       const SelectedRows& sel_rows,
+                                       ColumnarSerializedBatch::Column* dst) {
+  using offset_type = uint32_t;
+  DCHECK(cblock.type_info()->physical_type() == BINARY);
+  int n_sel = sel_rows.num_selected();
+  DCHECK_GT(n_sel, 0);
+
+  // If this is the first call, append a '0' entry for the offset of the first string.
+  if (dst->data.size() == 0) {
+    CHECK_EQ(dst->varlen_data->size(), 0);
+    offset_type zero_offset = 0;
+    dst->data.append(&zero_offset, sizeof(zero_offset));
+  }
 
-  if (cblock.type_info()->physical_type() == BINARY) {
-    RelocateSlicesToIndirect(dst_buf, n_sel, boost::get_pointer(dst->indirect_data));
+  // Number of initial rows in the dst values and null_bitmap.
+  DCHECK_EQ(dst->data.size() % sizeof(offset_type), 0);
+  size_t initial_offset_count = div_sizeof_type(dst->data.size(), sizeof(offset_type));
+  size_t initial_rows = initial_offset_count - 1;
+  size_t new_offset_count = initial_offset_count + n_sel;
+  size_t new_num_rows = initial_rows + n_sel;
+
+  if (cblock.is_nullable()) {
+    DCHECK_EQ(dst->non_null_bitmap->size(), BitmapSize(initial_rows));
+    dst->non_null_bitmap->resize_with_extra_capacity(BitmapSize(new_num_rows));
+    CopyNonNullBitmap(cblock.non_null_bitmap(),
+                      sel_rows.bitmap(),
+                      initial_rows, cblock.nrows(),
+                      dst->non_null_bitmap->data());
+    ZeroNullValues(sizeof(Slice), 0, cblock.nrows(),
+                   const_cast<ColumnBlock&>(cblock).data(), cblock.non_null_bitmap());
   }
+  dst->data.resize_with_extra_capacity(sizeof(offset_type) * new_offset_count);
+  offset_type* dst_offset = reinterpret_cast<offset_type*>(dst->data.data()) + initial_offset_count;
+  const Slice* src_slices = reinterpret_cast<const Slice*>(cblock.cell_ptr(0));
+  CopySlicesAndWriteEndOffsets(src_slices, sel_rows,
+                               dst_offset, boost::get_pointer(dst->varlen_data));
 }
+
 } // anonymous namespace
 } // namespace internal
 
@@ -552,7 +605,7 @@ int SerializeRowBlockColumnar(
       out->columns.emplace_back();
       out->columns.back().data.reserve(1024 * 1024);
       if (col.type_info()->physical_type() == BINARY) {
-        out->columns.back().indirect_data.emplace();
+        out->columns.back().varlen_data.emplace();
       }
       if (col.is_nullable()) {
         out->columns.back().non_null_bitmap.emplace();
@@ -561,16 +614,27 @@ int SerializeRowBlockColumnar(
   }
 
   SelectedRows sel = block.selection_vector()->GetSelectedRows();
+  if (sel.num_selected() == 0) {
+    return 0;
+  }
+
   int col_idx = 0;
   for (const auto& col : projection_schema->columns()) {
     int t_schema_idx = tablet_schema->find_column(col.name());
     CHECK_NE(t_schema_idx, -1);
     const ColumnBlock& column_block = block.column_block(t_schema_idx);
 
-    internal::CopySelectedCellsFromColumn(
-        column_block,
-        sel,
-        &out->columns[col_idx]);
+    if (column_block.type_info()->physical_type() == BINARY) {
+      internal::CopySelectedVarlenCellsFromColumn(
+          column_block,
+          sel,
+          &out->columns[col_idx]);
+    } else {
+      internal::CopySelectedCellsFromColumn(
+          column_block,
+          sel,
+          &out->columns[col_idx]);
+    }
     col_idx++;
   }
 
diff --git a/src/kudu/common/columnar_serialization.h b/src/kudu/common/columnar_serialization.h
index d7ad494..bc74f08 100644
--- a/src/kudu/common/columnar_serialization.h
+++ b/src/kudu/common/columnar_serialization.h
@@ -35,8 +35,8 @@ struct ColumnarSerializedBatch {
     // Underlying column data.
     faststring data;
 
-    // Data for varlen columns (BINARY)
-    boost::optional<faststring> indirect_data;
+    // Data for varlen columns (those with BINARY physical type)
+    boost::optional<faststring> varlen_data;
 
     // Each bit is set when a value is non-null
     boost::optional<faststring> non_null_bitmap;
diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 6e86e5f..0db69d2 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -258,6 +258,21 @@ class SelectedRows {
     return CreateRowIndexes();
   }
 
+  // Call F(index) for each selected index.
+  template<class F>
+  void ForEachIndex(F func) const {
+    if (all_selected_) {
+      int n_sel = num_selected();
+      for (int i = 0; i < n_sel; i++) {
+        func(i);
+      }
+    } else {
+      for (uint16_t i : indexes_) {
+        func(i);
+      }
+    }
+  }
+
  private:
   friend class SelectionVector;
 
diff --git a/src/kudu/common/wire_protocol-test.cc b/src/kudu/common/wire_protocol-test.cc
index ae18f1e..4a1c791 100644
--- a/src/kudu/common/wire_protocol-test.cc
+++ b/src/kudu/common/wire_protocol-test.cc
@@ -335,19 +335,21 @@ TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
           }
         }
         int type_size = col.type_info()->size();
-        Slice serialized_val(serialized_col.data.data() + type_size * dst_row_idx,
-                             type_size);
-        Slice orig_val(row.cell_ptr(c), type_size);
-
+        Slice serialized_val;
+        Slice orig_val;
         if (col.type_info()->physical_type() == BINARY) {
-          orig_val = *reinterpret_cast<const Slice*>(orig_val.data());
-          serialized_val = *reinterpret_cast<const Slice*>(serialized_val.data());
-
-          uintptr_t indirect_offset = reinterpret_cast<uintptr_t>(serialized_val.data());
-          serialized_val = Slice(serialized_col.indirect_data->data() + indirect_offset,
-                                 serialized_val.size());
+          const uint8_t* offset_ptr = serialized_col.data.data() + sizeof(uint32_t) * dst_row_idx;
+          uint32_t start_offset = UnalignedLoad<uint32_t>(offset_ptr);
+          uint32_t end_offset = UnalignedLoad<uint32_t>(offset_ptr + sizeof(uint32_t));
+          ASSERT_GE(end_offset, start_offset);
+          serialized_val = Slice(serialized_col.varlen_data->data() + start_offset,
+                                 end_offset - start_offset);
+          memcpy(&orig_val, row.cell_ptr(c), type_size);
+        } else {
+          serialized_val = Slice(serialized_col.data.data() + type_size * dst_row_idx,
+                                 type_size);
+          orig_val = Slice(row.cell_ptr(c), type_size);
         }
-
         EXPECT_EQ(orig_val, serialized_val);
       }
       dst_row_idx++;
diff --git a/src/kudu/common/wire_protocol.proto b/src/kudu/common/wire_protocol.proto
index 5ddba152..3b3bec2 100644
--- a/src/kudu/common/wire_protocol.proto
+++ b/src/kudu/common/wire_protocol.proto
@@ -158,9 +158,9 @@ message ColumnarRowBlockPB {
     optional int32 data_sidecar = 1;
 
     // The index of the sidecar containing any data referred to by binary/string data.
-    // In this case, the 'pointer' field of the Slices contained in the data sidecar
-    // will refer to offsets in this sidecar.
-    optional int32 indirect_data_sidecar = 2;
+    // In this case, the `data` sidecar will contain an array of `num_rows + 1` uint32s
+    // pointing to offsets in this sidecar.
+    optional int32 varlen_data_sidecar = 2;
 
     // If the column is nullable, The index of the sidecar containing a bitmap with a set
     // bit for all non-null cells.
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index c47a5e9..fab5ad4 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -2591,17 +2591,17 @@ TEST_F(TabletServerTest, TestColumnarScan) {
     Slice col_data;
     ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(2).data_sidecar(), &col_data));
 
-    Slice indirect_data;
-    ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(2).indirect_data_sidecar(),
-                                    &indirect_data));
+    Slice varlen_data;
+    ASSERT_OK(rpc.GetInboundSidecar(resp.columnar_data().columns(2).varlen_data_sidecar(),
+                                    &varlen_data));
 
     SCOPED_TRACE(col_data.ToDebugString());
-    ASSERT_EQ(col_data.size(), kNumRows * sizeof(Slice));
-    ArrayView<const Slice> cells(reinterpret_cast<const Slice*>(col_data.data()), kNumRows);
+    ASSERT_EQ(col_data.size(), (kNumRows + 1) * sizeof(uint32_t));
+    ArrayView<const uint32_t> offsets(reinterpret_cast<const uint32_t*>(col_data.data()),
+                                      kNumRows + 1);
     for (int i = 0; i < kNumRows; i++) {
-      Slice s = cells[i];
-      Slice real_str(indirect_data.data() + reinterpret_cast<uintptr_t>(s.data()),
-                     s.size());
+      Slice real_str(varlen_data.data() + offsets[i],
+                     offsets[i + 1] - offsets[i]);
       ASSERT_EQ(Substitute("hello $0", i), real_str);
     }
   }
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index c72d223..1bb93ff 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -817,8 +817,8 @@ class ColumnarResultSerializer : public ResultSerializer {
     int total = 0;
     for (const auto& col : results_.columns) {
       total += col.data.size();
-      if (col.indirect_data) {
-        total += col.indirect_data->size();
+      if (col.varlen_data) {
+        total += col.varlen_data->size();
       }
       if (col.non_null_bitmap) {
         total += col.non_null_bitmap->size();
@@ -838,10 +838,10 @@ class ColumnarResultSerializer : public ResultSerializer {
           RpcSidecar::FromFaststring((std::move(col.data))), &sidecar_idx));
       col_pb->set_data_sidecar(sidecar_idx);
 
-      if (col.indirect_data) {
+      if (col.varlen_data) {
         CHECK_OK(context->AddOutboundSidecar(
-            RpcSidecar::FromFaststring((std::move(*col.indirect_data))), &sidecar_idx));
-        col_pb->set_indirect_data_sidecar(sidecar_idx);
+            RpcSidecar::FromFaststring((std::move(*col.varlen_data))), &sidecar_idx));
+        col_pb->set_varlen_data_sidecar(sidecar_idx);
       }
 
       if (col.non_null_bitmap) {


[kudu] 02/04: [ranger] update description of Ranger integration related flags

Posted by to...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

todd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit cd65d50a464dba0aef0738f02cf4b08378099fb9
Author: Hao Hao <ha...@cloudera.com>
AuthorDate: Mon Apr 6 21:41:09 2020 -0700

    [ranger] update description of Ranger integration related flags
    
    This patch updates the description of Ranger related flags to
    make it more clear how to enable Ranger integration.
    
    Change-Id: If782f022ada6606b31d720f1dba793af45e8003e
    Reviewed-on: http://gerrit.cloudera.org:8080/15670
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/ranger/ranger_client.cc | 27 +++++++++++++++------------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git a/src/kudu/ranger/ranger_client.cc b/src/kudu/ranger/ranger_client.cc
index c661f69..00352e2 100644
--- a/src/kudu/ranger/ranger_client.cc
+++ b/src/kudu/ranger/ranger_client.cc
@@ -49,22 +49,25 @@
 #include "kudu/util/string_case.h"
 #include "kudu/util/subprocess.h"
 
-DEFINE_string(ranger_java_path, "",
-              "The path where the Java binary was installed. If "
-              "the value isn't an absolute path (e.g. 'java'), it will be "
-              "evaluated using the Kudu user's PATH. Empty string means "
-              "$JAVA_HOME/bin/java is used. If $JAVA_HOME is not found, Kudu "
-              "will attempt to find 'java' in $PATH.");
-
 DEFINE_string(ranger_config_path, "",
               "Path to directory containing Ranger client configuration. "
-              "Enables Ranger authorization provider. "
-              "sentry_service_rpc_addresses must not be set if this is "
-              "enabled.");
+              "When set, Ranger integration is enabled, fine-grained access "
+              "control is enforced, and clients are issued authorization "
+              "tokens. In addition, both --ranger_java_path and --ranger_jar_path "
+              "flags need to be set properly for Ranger integration to work. "
+              "The --sentry_service_rpc_addresses flag, which enables Sentry "
+              "integration, must not be set if this is enabled.");
+
+DEFINE_string(ranger_java_path, "",
+              "Path where the Java binary was installed. If the value "
+              "isn't an absolute path (e.g. 'java'), it will be evaluated "
+              "using the Kudu user's PATH. If not specified, $JAVA_HOME/bin/java "
+              "is used. If $JAVA_HOME is not found, Kudu will attempt to "
+              "find 'java' in the Kudu user's PATH.");
 
 DEFINE_string(ranger_jar_path, "",
-              "Path to the JAR file containing the Ranger subprocess. "
-              "If not set, the default JAR file path is expected to be"
+              "Path to the JAR file containing the Ranger subprocess. If "
+              "not specified, the default JAR file path is expected to be "
               "next to the master binary.");
 
 DEFINE_string(ranger_receiver_fifo_dir, "",