You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2023/02/17 23:32:49 UTC

[kudu] branch master updated: [Client] Add query id to trace the whole query process

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 834de7fcc [Client] Add query id to trace the whole query process
834de7fcc is described below

commit 834de7fccdb5faadb2ca9e1d1e07d4c7882ae0fa
Author: xinghuayu007 <14...@qq.com>
AuthorDate: Sun Aug 14 16:00:02 2022 +0800

    [Client] Add query id to trace the whole query process
    
    In Impala, an SQL is translated into a query plan. The
    query plan contains multiple execution units and is
    executed at different hosts of a cluster in a distributed
    manner. There exists an unique query id to trace the
    execution process of every execution unit. It is useful
    in analysis of stuck, failed, and otherwise non-performant
    queries. Query id can associate all log records in different
    hosts, which belongs to one query.
    
    Scanning a Kudu table is split into multiple scan operations
    on appropriate tablets, one scanner per tablet. When scanning
    a Kudu table using Impala, corresponding scan nodes appear
    for every scanner in the Impala's query plan.  Scanner id is
    used to trace the scan operation on a tablet.
    
    But there is a gap between Impala and Kudu. We can't trace the
    whole scanning process in Kudu of a query originated from Impala.
    There is not unique id to associate the execution of all scanners
    in Kudu.
    
    Therefore, with this patch it is now possible to post Impala's
    query identifier to Kudu, so it can be associated with Kudu's
    scanner id. Then we can trace a query from Impala to Kudu for
    troubleshooting and debugging.
    
    Change-Id: I9dbae801596726fec1c85ee547128da3179345d9
    Reviewed-on: http://gerrit.cloudera.org:8080/18846
    Tested-by: Alexey Serbin <al...@apache.org>
    Reviewed-by: Alexey Serbin <al...@apache.org>
---
 src/kudu/client/client-test.cc         | 16 +++++++-
 src/kudu/client/client.cc              | 24 ++++++++++++
 src/kudu/client/client.h               | 33 ++++++++++++++++
 src/kudu/client/client.proto           |  6 +++
 src/kudu/client/scan_token-internal.cc |  5 +++
 src/kudu/client/scan_token-internal.h  |  5 +++
 src/kudu/client/scan_token-test.cc     | 69 ++++++++++++++++++++++++++++++++++
 src/kudu/client/scanner-internal.cc    |  4 +-
 src/kudu/tserver/tablet_server-test.cc | 25 ++++++++++++
 src/kudu/tserver/tablet_service.cc     | 20 +++++-----
 src/kudu/tserver/tserver.proto         |  3 ++
 11 files changed, 197 insertions(+), 13 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 104d2a213..e1a5e451c 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -707,7 +707,7 @@ class ClientTest : public KuduTest {
     }
   }
 
-  void DoTestScanWithStringPredicate() {
+  void DoTestScanWithStringPredicate(string query_id = "") {
     KuduScanner scanner(client_table_.get());
     ASSERT_OK(scanner.AddConjunctPredicate(
                   client_table_->NewComparisonPredicate("string_val", KuduPredicate::GREATER_EQUAL,
@@ -715,7 +715,9 @@ class ClientTest : public KuduTest {
     ASSERT_OK(scanner.AddConjunctPredicate(
                   client_table_->NewComparisonPredicate("string_val", KuduPredicate::LESS_EQUAL,
                                                         KuduValue::CopyString("hello 3"))));
-
+    if (!query_id.empty()) {
+      scanner.SetQueryId(query_id);
+    }
     LOG_TIMING(INFO, "Scanning with string predicate") {
       ASSERT_OK(scanner.Open());
 
@@ -1267,6 +1269,16 @@ TEST_F(ClientTest, TestScan) {
   DoTestScanWithKeyPredicate();
 }
 
+TEST_F(ClientTest, TestScanWithQueryId) {
+  NO_FATALS(InsertTestRows(client_table_.get(), FLAGS_test_scan_num_rows));
+  ASSERT_EQ(FLAGS_test_scan_num_rows, CountRowsFromClient(client_table_.get()));
+
+  // Scan with the specified query id.
+  DoTestScanWithStringPredicate("test_query_id");
+  // Scan with default query id.
+  DoTestScanWithStringPredicate();
+}
+
 TEST_F(ClientTest, TestScanAtSnapshot) {
   int half_the_rows = FLAGS_test_scan_num_rows / 2;
 
diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 1d767cf22..21efb537f 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -1765,6 +1765,17 @@ Status KuduScanner::SetProjectedColumns(const vector<string>& col_names) {
   return SetProjectedColumnNames(col_names);
 }
 
+Status KuduScanner::SetQueryId(const string& query_id) {
+  if (data_->open_) {
+    return Status::IllegalState("Query id must be set before Open()");
+  }
+  if (query_id.empty()) {
+    return Status::InvalidArgument("Query id should not be empty");
+  }
+  data_->next_req_.set_query_id(query_id);
+  return Status::OK();
+}
+
 Status KuduScanner::SetProjectedColumnNames(const vector<string>& col_names) {
   if (data_->open_) {
     return Status::IllegalState("Projection must be set before Open()");
@@ -2012,6 +2023,11 @@ Status KuduScanner::Open() {
   MonoTime deadline = MonoTime::Now() + data_->configuration().timeout();
   set<string> blacklist;
 
+  if (data_->next_req_.query_id().empty()) {
+    static ObjectIdGenerator oid_generator;
+    data_->next_req_.set_query_id(oid_generator.Next());
+  }
+
   RETURN_NOT_OK(data_->OpenNextTablet(deadline, &blacklist));
 
   data_->open_ = true;
@@ -2309,6 +2325,14 @@ Status KuduScanTokenBuilder::SetCacheBlocks(bool cache_blocks) {
   return data_->mutable_configuration()->SetCacheBlocks(cache_blocks);
 }
 
+Status KuduScanTokenBuilder::SetQueryId(const string& query_id) {
+  if (query_id.empty()) {
+    return Status::InvalidArgument("Query id should not be empty");
+  }
+  data_->SetQueryId(query_id);
+  return Status::OK();
+}
+
 Status KuduScanTokenBuilder::Build(vector<KuduScanToken*>* tokens) {
   return data_->Build(tokens);
 }
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 026f75467..a46179814 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -2762,6 +2762,7 @@ class KUDU_EXPORT KuduScanner {
   ///   The table to perfrom scan. The given object must remain valid
   ///   for the lifetime of this scanner object.
   explicit KuduScanner(KuduTable* table);
+
   ~KuduScanner();
 
   /// Set the projection for the scanner using column names.
@@ -2797,6 +2798,27 @@ class KUDU_EXPORT KuduScanner {
       WARN_UNUSED_RESULT
       ATTRIBUTE_DEPRECATED("use SetProjectedColumnNames() instead");
 
+  /// Set a query id for the scan to trace the whole scanning process.
+  /// Query id is posted by the user or generated automatically by the
+  /// client library code. It is used to trace the whole query process
+  /// for debugging.
+  ///
+  /// Example usage:
+  /// @code
+  ///   KuduScanner scanner(...);
+  ///   scanner.SetQueryId(query_id);
+  ///   scanner.Open();
+  ///   while (scanner.HasMoreRows()) {
+  ///     KuduScanBatch batch;
+  ///     scanner.NextBatch(&batch);
+  ///   }
+  /// @endcode
+  ///
+  /// @param [in] query_id
+  ///   A query id to identify a query.
+  /// @return Operation result status.
+  Status SetQueryId(const std::string& query_id);
+
   /// Add a predicate for the scan.
   ///
   /// @param [in] pred
@@ -3153,10 +3175,12 @@ class KUDU_EXPORT KuduScanner {
   FRIEND_TEST(ClientTest, TestScanFaultTolerance);
   FRIEND_TEST(ClientTest, TestScanNoBlockCaching);
   FRIEND_TEST(ClientTest, TestScanTimeout);
+  FRIEND_TEST(ClientTest, TestScanWithQueryId);
   FRIEND_TEST(ClientTest, TestReadAtSnapshotNoTimestampSet);
   FRIEND_TEST(ConsistencyITest, TestSnapshotScanTimestampReuse);
   FRIEND_TEST(ScanTokenTest, TestScanTokens);
   FRIEND_TEST(ScanTokenTest, TestScanTokens_NonUniquePrimaryKey);
+  FRIEND_TEST(ScanTokenTest, TestScanTokensWithQueryId);
 
   // Owned.
   Data* data_;
@@ -3350,6 +3374,15 @@ class KUDU_EXPORT KuduScanTokenBuilder {
   /// @return Operation result status.
   Status IncludeTabletMetadata(bool include_metadata) WARN_UNUSED_RESULT;
 
+  /// Set a query id for the scan to trace the whole process.
+  /// Query id is set by the user or generated automatically.
+  /// It is used to trace the whole query process for for troubleshooting and debugging.
+  ///
+  /// @param [in] query_id
+  ///   A query id to identify a query.
+  /// @return Operation result status.
+  Status SetQueryId(const std::string& query_id);
+
   /// Build the set of scan tokens.
   ///
   /// The builder may be reused after this call.
diff --git a/src/kudu/client/client.proto b/src/kudu/client/client.proto
index 05d5deaf8..55b863e58 100644
--- a/src/kudu/client/client.proto
+++ b/src/kudu/client/client.proto
@@ -197,6 +197,12 @@ message ScanTokenPB {
 
   // An authorization token with which to authorize the scan requests.
   optional security.SignedTokenPB authz_token = 24;
+
+  // Identifier of a top-level query which this scan requests is a part of.
+  // Query id allows to associate the scanner built out of this token with
+  // the top-level query; it is useful for tracing the execution of the
+  // top-level query through various parts of the data pipeline.
+  optional string query_id = 25;
 }
 
 // All of the data necessary to authenticate to a cluster from a client with
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index 17fce1b3b..9e0a86e7c 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -330,6 +330,10 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
     scan_builder->SetTimeoutMillis(message.scan_request_timeout_ms());
   }
 
+  if (message.has_query_id()) {
+    scan_builder->SetQueryId(message.query_id());
+  }
+
   *scanner = scan_builder.release();
   return Status::OK();
 }
@@ -465,6 +469,7 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
     pb.set_batch_size_bytes(configuration_.batch_size_bytes());
   }
 
+  pb.set_query_id(query_id_);
   PartitionPruner pruner;
   vector<MetaCache::RangeWithRemoteTablet> range_tablets;
   pruner.Init(*table->schema().schema_, table->partition_schema(), configuration_.spec());
diff --git a/src/kudu/client/scan_token-internal.h b/src/kudu/client/scan_token-internal.h
index 97d464053..6a4eaa8ac 100644
--- a/src/kudu/client/scan_token-internal.h
+++ b/src/kudu/client/scan_token-internal.h
@@ -86,11 +86,16 @@ class KuduScanTokenBuilder::Data {
     split_size_bytes_ = split_size_bytes;
   }
 
+  void SetQueryId(const std::string& query_id) {
+    query_id_ = query_id;
+  }
+
 private:
   ScanConfiguration configuration_;
   bool include_table_metadata_ = true;
   bool include_tablet_metadata_ = true;
   uint64_t split_size_bytes_ = 0;
+  std::string query_id_;
 };
 
 } // namespace client
diff --git a/src/kudu/client/scan_token-test.cc b/src/kudu/client/scan_token-test.cc
index d46bfc12b..21c3d9b16 100644
--- a/src/kudu/client/scan_token-test.cc
+++ b/src/kudu/client/scan_token-test.cc
@@ -697,6 +697,75 @@ TEST_F(ScanTokenTest, TestScanTokens_NonUniquePrimaryKey) {
   }
 }
 
+TEST_F(ScanTokenTest, TestScanTokensWithQueryId) {
+  int64_t insert_rows_num = 0;
+  {
+    // Create a table with 2 partitions, 1 replication factor.
+    TestWorkload workload(cluster_.get(), TestWorkload::PartitioningType::HASH);
+    workload.set_table_name("test_table");
+    workload.set_num_tablets(2);
+    workload.set_num_replicas(1);
+    workload.Setup();
+    workload.Start();
+    ASSERT_EVENTUALLY([&]() { ASSERT_GE(workload.rows_inserted(), kRecordCount); });
+    workload.StopAndJoin();
+    insert_rows_num = workload.rows_inserted();
+  }
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
+  ASSERT_OK(client_->OpenTable("test_table", &table));
+  ASSERT_NE(nullptr, table.get());
+
+  // Scan with query id.
+  {
+    vector<KuduScanToken*> tokens;
+    KuduScanTokenBuilder builder(table.get());
+    ASSERT_OK(builder.SetQueryId("query-id-for-test"));
+    ASSERT_OK(builder.Build(&tokens));
+
+    int64_t count = 0;
+    ASSERT_EQ(2, tokens.size());
+    for (const auto& token : tokens) {
+      unique_ptr<KuduScanner> scanner;
+      ASSERT_OK(IntoUniqueScanner(client_.get(), *token, &scanner));
+      ASSERT_OK(scanner->Open());
+      ASSERT_EQ("query-id-for-test", scanner->data_->next_req_.query_id());
+      KuduScanBatch batch;
+      ASSERT_TRUE(scanner->HasMoreRows());
+
+      while (scanner->HasMoreRows()) {
+        ASSERT_OK(scanner->NextBatch(&batch));
+        count += batch.NumRows();
+      }
+    }
+    ASSERT_EQ(insert_rows_num, count);
+  }
+
+  // Scan without query id.
+  {
+    vector<KuduScanToken*> tokens;
+    KuduScanTokenBuilder builder(table.get());
+    ASSERT_OK(builder.Build(&tokens));
+
+    int64_t count = 0;
+    ASSERT_EQ(2, tokens.size());
+    for (const auto& token : tokens) {
+      unique_ptr<KuduScanner> scanner;
+      ASSERT_OK(IntoUniqueScanner(client_.get(), *token, &scanner));
+      ASSERT_OK(scanner->Open());
+      ASSERT_FALSE(scanner->data_->next_req_.query_id().empty());
+      KuduScanBatch batch;
+      ASSERT_TRUE(scanner->HasMoreRows());
+
+      while (scanner->HasMoreRows()) {
+        ASSERT_OK(scanner->NextBatch(&batch));
+        count += batch.NumRows();
+      }
+    }
+    ASSERT_EQ(insert_rows_num, count);
+  }
+}
+
 TEST_F(ScanTokenTest, TestScanTokensWithNonCoveringRange) {
   // Create schema
   KuduSchema schema;
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index a5ad65bb4..297c6009e 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -21,8 +21,8 @@
 #include <cstdint>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <unordered_map>
-#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -49,7 +49,7 @@
 #include "kudu/rpc/rpc_controller.h"
 #include "kudu/rpc/rpc_header.pb.h"
 #include "kudu/security/token.pb.h"
-#include "kudu/tserver/tserver_service.proxy.h"
+#include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep
 #include "kudu/util/async_util.h"
 #include "kudu/util/bitmap.h"
 #include "kudu/util/hexdump.h"
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 7ddbb3836..f023f9fac 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -2882,6 +2882,31 @@ TEST_F(TabletServerTest, TestConcurrentAccessToOneScanner) {
   ASSERT_EQ(total_rows, kNumRows);
 }
 
+// Test for a scan with query id in server side.
+TEST_F(TabletServerTest, TestScanWithQueryId) {
+  InsertTestRowsDirect(0, 100);
+
+  ScanRequestPB req;
+  NewScanRequestPB* scan = req.mutable_new_scan_request();
+  scan->set_tablet_id(kTabletId);
+  req.set_batch_size_bytes(0); // so it won't return data right away
+  ASSERT_OK(SchemaToColumnPBs(schema_, scan->mutable_projected_columns()));
+  req.set_query_id("query_id_for_test");
+
+  ScanResponsePB resp;
+  RpcController rpc;
+  // Send the call
+  {
+    SCOPED_TRACE(SecureDebugString(req));
+    ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
+    SCOPED_TRACE(SecureDebugString(resp));
+    ASSERT_FALSE(resp.has_error());
+  }
+  vector<string> results;
+  NO_FATALS(
+    DrainScannerToStrings(resp.scanner_id(), schema_, &results));
+  ASSERT_EQ(100, results.size());
+}
 
 TEST_F(TabletServerTest, TestScanWithStringPredicates) {
   InsertTestRowsDirect(0, 100);
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index f1704b001..bed57de7e 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -32,7 +32,6 @@
 
 #include <gflags/gflags.h>
 #include <glog/logging.h>
-#include <google/protobuf/stubs/port.h>
 
 #include "kudu/clock/clock.h"
 #include "kudu/common/column_predicate.h"
@@ -67,6 +66,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/template_util.h"
 #include "kudu/rpc/inbound_call.h"
 #include "kudu/rpc/remote_user.h"
 #include "kudu/rpc/rpc_context.h"
@@ -2751,15 +2751,16 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   DCHECK(error_code != nullptr);
   DCHECK(req->has_new_scan_request());
   const NewScanRequestPB& scan_pb = req->new_scan_request();
-  TRACE_EVENT1("tserver", "TabletServiceImpl::HandleNewScanRequest",
-               "tablet_id", scan_pb.tablet_id());
-
+  TRACE_EVENT2("tserver", "TabletServiceImpl::HandleNewScanRequest",
+               "tablet_id", scan_pb.tablet_id(),
+               "query_id", req->query_id());
   SharedScanner scanner;
   server_->scanner_manager()->NewScanner(replica,
                                          rpc_context->remote_user(),
                                          scan_pb.row_format_flags(),
                                          &scanner);
-  TRACE("Created scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
+  TRACE("Created scanner $0 for tablet $1, query id is $2",
+        scanner->id(), scanner->tablet_id(), req->query_id());
   auto scanner_lock = scanner->LockForAccess();
 
   // If we early-exit out of this function, automatically unregister
@@ -3032,9 +3033,9 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
                                                     bool* has_more_results,
                                                     TabletServerErrorPB::Code* error_code) {
   DCHECK(req->has_scanner_id());
-  TRACE_EVENT1("tserver", "TabletServiceImpl::HandleContinueScanRequest",
-               "scanner_id", req->scanner_id());
-
+  TRACE_EVENT2("tserver", "TabletServiceImpl::HandleContinueScanRequest",
+               "scanner_id", req->scanner_id(),
+               "query_id", req->query_id());
   size_t batch_size_bytes = GetMaxBatchSizeBytesHint(req);
 
   SharedScanner scanner;
@@ -3069,7 +3070,8 @@ Status TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
 
   VLOG(2) << "Found existing scanner " << scanner->id() << " for request: "
           << SecureShortDebugString(*req);
-  TRACE("Found scanner $0 for tablet $1", scanner->id(), scanner->tablet_id());
+  TRACE("Found scanner $0 for tablet $1, query id is $2",
+        scanner->id(), scanner->tablet_id(), req->query_id());
 
   if (batch_size_bytes == 0 && req->close_scanner()) {
     *has_more_results = false;
diff --git a/src/kudu/tserver/tserver.proto b/src/kudu/tserver/tserver.proto
index dfceae8cb..abe16e69e 100644
--- a/src/kudu/tserver/tserver.proto
+++ b/src/kudu/tserver/tserver.proto
@@ -400,6 +400,9 @@ message ScanRequestPB {
   // In order to simply close a scanner without selecting any rows, you
   // may set batch_size_bytes to 0 in conjunction with setting this flag.
   optional bool close_scanner = 5;
+
+  // Query id is used to trace the whole process of reading tablets.
+  optional bytes query_id = 6;
 }
 
 // RPC's resource metrics.