You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2020/01/06 05:19:27 UTC

[kudu] branch master updated: KUDU-3032 Prevent selecting unnecessarily columns after scan optimization

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new c767bd1  KUDU-3032 Prevent selecting unnecessarily columns after scan optimization
c767bd1 is described below

commit c767bd1149757094d8df96eaca879c06a5c1b648
Author: triplesheep <tr...@gmail.com>
AuthorDate: Sun Dec 22 19:39:22 2019 +0800

    KUDU-3032 Prevent selecting unnecessarily columns after scan optimization
    
    Not only those key columns which can convert to range but also
    those non-nullable columns which can optimize with IS NOT NULL
    predicates.
    
    Change-Id: I82c30c5272b6fc114b710df9d6c31d5d1e319e31
    Reviewed-on: http://gerrit.cloudera.org:8080/14945
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/common/scan_spec-test.cc      | 54 ++++++++++++++++++++++++++++++-
 src/kudu/common/scan_spec.cc           | 17 ++++++++++
 src/kudu/common/scan_spec.h            |  7 +++-
 src/kudu/tserver/tablet_server-test.cc | 59 ++++++++++++++++++++++++++++++++++
 src/kudu/tserver/tablet_service.cc     | 31 ++++++------------
 5 files changed, 145 insertions(+), 23 deletions(-)

diff --git a/src/kudu/common/scan_spec-test.cc b/src/kudu/common/scan_spec-test.cc
index 79969bd..fbbd3ca 100644
--- a/src/kudu/common/scan_spec-test.cc
+++ b/src/kudu/common/scan_spec-test.cc
@@ -41,12 +41,22 @@
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
 using std::vector;
 
 namespace kudu {
 
+static std::string ToString(const vector<ColumnSchema>& columns) {
+  std::string str;
+  for (const auto& column : columns) {
+    str += column.ToString();
+    str += "\n";
+  }
+  return str;
+}
+
 class TestScanSpec : public KuduTest {
  public:
   explicit TestScanSpec(const Schema& s)
@@ -136,7 +146,8 @@ class CompositeIntKeysTest : public TestScanSpec {
         Schema({ ColumnSchema("a", INT8),
                  ColumnSchema("b", INT8),
                  ColumnSchema("c", INT8),
-                 ColumnSchema("d", INT8, true) },
+                 ColumnSchema("d", INT8, true),
+                 ColumnSchema("e", INT8) },
                3)) {
   }
 };
@@ -705,6 +716,47 @@ TEST_F(CompositeIntKeysTest, TestLiftPrimaryKeyBounds_WithPredicates) {
   }
 }
 
+// Predicates: a = 3 AND 4 <= b AND c IsNotNull And d IsNotNull.
+TEST_F(CompositeIntKeysTest, TestGetMissingColumns) {
+  ScanSpec spec;
+  AddPredicate<int8_t>(&spec, "a", GE, 3);
+  AddPredicate<int8_t>(&spec, "b", GE, 4);
+  spec.AddPredicate(ColumnPredicate::IsNotNull(schema_.column(2)));
+  spec.AddPredicate(ColumnPredicate::IsNotNull(schema_.column(3)));
+  SCOPED_TRACE(spec.ToString(schema_));
+  spec.OptimizeScan(schema_, &arena_, &pool_, true);
+  EXPECT_EQ("PK >= (int8 a=3, int8 b=4, int8 c=-128) AND "
+            "b >= 4 AND d IS NOT NULL", spec.ToString(schema_));
+  EXPECT_EQ(2, spec.predicates().size());
+  {
+    // Projection: e.
+    Schema projection({ ColumnSchema("e", INT8) }, 0);
+    vector<ColumnSchema> missing_cols = spec.GetMissingColumns(projection);
+    EXPECT_EQ(2, missing_cols.size());
+    std::string missing_cols_str = ToString(missing_cols);
+    EXPECT_STR_CONTAINS(missing_cols_str, "b INT8");
+    EXPECT_STR_CONTAINS(missing_cols_str, "d INT8");
+  }
+  {
+    // Projection: d e.
+    Schema projection({ ColumnSchema("d", INT8, true),
+                        ColumnSchema("e", INT8) }, 0);
+    vector<ColumnSchema> missing_cols = spec.GetMissingColumns(projection);
+    EXPECT_EQ(1, missing_cols.size());
+    std::string missing_cols_str = ToString(missing_cols);
+    EXPECT_STR_CONTAINS(missing_cols_str, "b INT8");
+  }
+
+  {
+    // Projection: b d e.
+    Schema projection({ ColumnSchema("b", INT8),
+                        ColumnSchema("d", INT8, true),
+                        ColumnSchema("e", INT8) }, 0);
+    vector<ColumnSchema> missing_cols = spec.GetMissingColumns(projection);
+    EXPECT_EQ(0, missing_cols.size());
+  }
+}
+
 // Tests for String parts in composite keys
 //------------------------------------------------------------
 class CompositeIntStringKeysTest : public TestScanSpec {
diff --git a/src/kudu/common/scan_spec.cc b/src/kudu/common/scan_spec.cc
index 0660c10..d20826d 100644
--- a/src/kudu/common/scan_spec.cc
+++ b/src/kudu/common/scan_spec.cc
@@ -21,6 +21,7 @@
 #include <iterator>
 #include <ostream>
 #include <string>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -45,6 +46,7 @@ using std::max;
 using std::move;
 using std::pair;
 using std::string;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -159,6 +161,21 @@ void ScanSpec::OptimizeScan(const Schema& schema,
   }
 }
 
+vector<ColumnSchema> ScanSpec::GetMissingColumns(const Schema& projection) {
+  vector<ColumnSchema> missing_cols;
+  unordered_set<string> missing_col_names;
+  for (const auto& entry : predicates_) {
+    const auto& predicate = entry.second;
+    const auto& column_name = predicate.column().name();
+    if (projection.find_column(column_name) == Schema::kColumnNotFound &&
+        !ContainsKey(missing_col_names, column_name)) {
+      missing_cols.push_back(predicate.column());
+      InsertOrDie(&missing_col_names, column_name);
+    }
+  }
+  return missing_cols;
+}
+
 void ScanSpec::PushPredicatesIntoPrimaryKeyBounds(const Schema& schema,
                                                   Arena* arena,
                                                   AutoReleasePool* pool,
diff --git a/src/kudu/common/scan_spec.h b/src/kudu/common/scan_spec.h
index 64c8d2c..224694f 100644
--- a/src/kudu/common/scan_spec.h
+++ b/src/kudu/common/scan_spec.h
@@ -19,6 +19,7 @@
 #include <cstdint>
 #include <string>
 #include <unordered_map>
+#include <vector>
 
 #include <boost/optional/optional.hpp>
 #include <glog/logging.h>
@@ -28,8 +29,9 @@
 
 namespace kudu {
 
-class AutoReleasePool;
 class Arena;
+class AutoReleasePool;
+class ColumnSchema;
 class EncodedKey;
 class Schema;
 
@@ -68,6 +70,9 @@ class ScanSpec {
                     AutoReleasePool* pool,
                     bool remove_pushed_predicates);
 
+  // Get columns that are present in the predicates but not in the projection
+  std::vector<ColumnSchema> GetMissingColumns(const Schema& projection);
+
   // Set the lower bound (inclusive) primary key for the scan.
   // Does not take ownership of 'key', which must remain valid.
   // If called multiple times, the most restrictive key will be used.
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index a725390..f71974f 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -2698,6 +2698,65 @@ TEST_F(TabletServerTest, TestScanWithEncodedPredicates) {
             results.back());
 }
 
+TEST_F(TabletServerTest, TestScanWithSimplifiablePredicates) {
+  int num_rows = AllowSlowTests() ? 10000 : 1000;
+  InsertTestRowsDirect(0, num_rows);
+
+  ScanRequestPB req;
+  ScanResponsePB resp;
+  RpcController rpc;
+
+  NewScanRequestPB* scan = req.mutable_new_scan_request();
+  scan->set_tablet_id(kTabletId);
+  req.set_batch_size_bytes(0); // so it won't return data right away
+  // Set up a projection without the key columns or the column after the last key column
+  SchemaBuilder sb(schema_);
+  for (int i = 0; i <= schema_.num_key_columns(); i++) {
+    sb.RemoveColumn(schema_.column(i).name());
+  }
+  const Schema& projection = sb.BuildWithoutIds();
+  ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns()));
+
+  // Set up a key range predicate: 51 <= key < 100
+  ColumnPredicatePB* key_predicate = scan->add_column_predicates();
+  key_predicate->set_column(schema_.column(0).name());
+  ColumnPredicatePB::Range* range = key_predicate->mutable_range();
+  int32_t lower_bound_inclusive = 51;
+  int32_t upper_bound_exclusive = 100;
+  range->mutable_lower()->append(
+    reinterpret_cast<char*>(&lower_bound_inclusive), sizeof(lower_bound_inclusive));
+  range->mutable_upper()->append(
+    reinterpret_cast<char*>(&upper_bound_exclusive), sizeof(upper_bound_exclusive));
+  // Set up is not null predicate for not nullable column.
+  ColumnPredicatePB* is_not_null_predicate = scan->add_column_predicates();
+  is_not_null_predicate->set_column(schema_.column(1).name());
+  is_not_null_predicate->mutable_is_not_null();
+  // Send the call
+  {
+    SCOPED_TRACE(SecureDebugString(req));
+    ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
+    SCOPED_TRACE(SecureDebugString(resp));
+    ASSERT_FALSE(resp.has_error());
+  }
+
+  // Ensure that the scanner includes correct columns.
+  {
+    auto scan_descriptors = mini_server_->server()->scanner_manager()->ListScans();
+    ASSERT_EQ(1, projection.columns().size());
+    ASSERT_EQ(1, scan_descriptors.size());
+    ASSERT_EQ(projection.columns().size(), scan_descriptors[0].projected_columns.size());
+    ASSERT_EQ(2, scan_descriptors[0].predicates.size());
+    ASSERT_EQ(projection.columns().size(), scan_descriptors[0].iterator_stats.size());
+    ASSERT_EQ(projection.column(0).name(), scan_descriptors[0].iterator_stats[0].first);
+  }
+
+  // Drain all the rows from the scanner.
+  vector<string> results;
+  NO_FATALS(
+    DrainScannerToStrings(resp.scanner_id(), projection, &results));
+  ASSERT_EQ(49, results.size());
+}
+
 // Test for diff scan RPC interface.
 TEST_F(TabletServerTest, TestDiffScan) {
   // Insert 100 rows with the usual pattern.
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 6621b5f..15d11c1 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -91,6 +91,7 @@
 #include "kudu/tserver/tserver_admin.pb.h"
 #include "kudu/tserver/tserver_service.pb.h"
 #include "kudu/util/auto_release_pool.h"
+#include "kudu/util/bitset.h"
 #include "kudu/util/crc.h"
 #include "kudu/util/debug/trace_event.h"
 #include "kudu/util/faststring.h"
@@ -2141,26 +2142,15 @@ static Status DecodeEncodedKeyRange(const NewScanRequestPB& scan_pb,
 
 static Status SetupScanSpec(const NewScanRequestPB& scan_pb,
                             const Schema& tablet_schema,
-                            const Schema& projection,
-                            vector<ColumnSchema>* missing_cols,
                             gscoped_ptr<ScanSpec>* spec,
                             const SharedScanner& scanner) {
   gscoped_ptr<ScanSpec> ret(new ScanSpec);
   ret->set_cache_blocks(scan_pb.cache_blocks());
 
-  unordered_set<string> missing_col_names;
-
   // First the column predicates.
   for (const ColumnPredicatePB& pred_pb : scan_pb.column_predicates()) {
     boost::optional<ColumnPredicate> predicate;
     RETURN_NOT_OK(ColumnPredicateFromPB(tablet_schema, scanner->arena(), pred_pb, &predicate));
-
-    if (projection.find_column(predicate->column().name()) == Schema::kColumnNotFound &&
-        !ContainsKey(missing_col_names, predicate->column().name())) {
-      InsertOrDie(&missing_col_names, predicate->column().name());
-      missing_cols->push_back(predicate->column());
-    }
-
     ret->AddPredicate(std::move(*predicate));
   }
 
@@ -2175,11 +2165,6 @@ static Status SetupScanSpec(const NewScanRequestPB& scan_pb,
     }
     boost::optional<ColumnSchema> col;
     RETURN_NOT_OK(ColumnSchemaFromPB(pred_pb.column(), &col));
-    if (projection.find_column(col->name()) == Schema::kColumnNotFound &&
-        !ContainsKey(missing_col_names, col->name())) {
-      missing_cols->push_back(*col);
-      InsertOrDie(&missing_col_names, col->name());
-    }
 
     const void* lower_bound = nullptr;
     const void* upper_bound = nullptr;
@@ -2324,11 +2309,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
 
   gscoped_ptr<ScanSpec> spec(new ScanSpec);
 
-  // Missing columns will contain the columns that are not mentioned in the
-  // client projection but are actually needed for the scan, such as columns
-  // referred to by predicates.
-  vector<ColumnSchema> missing_cols;
-  s = SetupScanSpec(scan_pb, tablet_schema, projection, &missing_cols, &spec, scanner);
+  s = SetupScanSpec(scan_pb, tablet_schema, &spec, scanner);
   if (PREDICT_FALSE(!s.ok())) {
     *error_code = TabletServerErrorPB::INVALID_SCAN_SPEC;
     return s;
@@ -2338,6 +2319,13 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   spec->OptimizeScan(tablet_schema, scanner->arena(), scanner->autorelease_pool(), true);
   VLOG(3) << "After optimizing scan spec: " << spec->ToString(tablet_schema);
 
+  // Missing columns will contain the columns that are not mentioned in the
+  // client projection but are actually needed for the scan, such as columns
+  // referred to by predicates.
+  //
+  // NOTE: We should build the missing column after optimizing scan which will
+  // remove unnecessary predicates.
+  vector<ColumnSchema> missing_cols = spec->GetMissingColumns(projection);
   if (spec->CanShortCircuit()) {
     VLOG(1) << "short-circuiting without creating a server-side scanner.";
     *has_more_results = false;
@@ -2387,6 +2375,7 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
     }
   }
   projection = projection_builder.BuildWithoutIds();
+  VLOG(3) << "Scan projection: " << projection.ToString(Schema::BASE_INFO);
 
   unique_ptr<RowwiseIterator> iter;