You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/10/23 21:10:08 UTC

[kudu] branch master updated: KUDU-2980: fix ORDERED scans when key columns are misordered in projection

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new e23e52a  KUDU-2980: fix ORDERED scans when key columns are misordered in projection
e23e52a is described below

commit e23e52a1b2040f12361824fde5334b46c879b142
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Thu Oct 17 17:10:39 2019 -0700

    KUDU-2980: fix ORDERED scans when key columns are misordered in projection
    
    This patch fixes KUDU-2980 by taking a stricter approach to how key columns
    appear in ORDERED scan projections. Any key columns already in the
    client-provided projection are completely ignored. Instead, when we rebuild
    the projection server-side, the first section (i.e. the part with the key
    columns) is built using the tablet's schema. After that, we incorporate any
    remaining columns from the client-provided projection, dropping any key
    columns. Finally, we add any columns necessary to satisfy predicates.
    
    The new fuzz test reproduces the bug very easily, and is expansive enough to
    provide additional coverage for funky projection and predicate ordering. I
    looped it 1000 times in dist-test without failure.
    
    Change-Id: Idb213f466c7d01b6953a863d8aa53a34b5ac8893
    Reviewed-on: http://gerrit.cloudera.org:8080/14503
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/client/client-test.cc              | 126 ++++++++++++++++++++++++++++
 src/kudu/common/generic_iterators.cc        |  15 ++--
 src/kudu/common/partial_row.h               |   2 +
 src/kudu/integration-tests/data_gen_util.cc |  12 +++
 src/kudu/integration-tests/data_gen_util.h  |   5 ++
 src/kudu/tserver/tablet_service.cc          |  64 ++++++++------
 src/kudu/util/random_util.h                 |   3 +-
 7 files changed, 195 insertions(+), 32 deletions(-)

diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index f44eaa6..4e19e00 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -28,6 +28,7 @@
 #include <set>
 #include <string>
 #include <thread>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -75,7 +76,9 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/integration-tests/data_gen_util.h"
 #include "kudu/master/catalog_manager.h"
 #include "kudu/master/master.h"
 #include "kudu/master/master.pb.h"
@@ -104,6 +107,8 @@
 #include "kudu/util/monotime.h"
 #include "kudu/util/net/sockaddr.h"
 #include "kudu/util/pb_util.h"
+#include "kudu/util/random.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/semaphore.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -177,6 +182,7 @@ using std::set;
 using std::string;
 using std::thread;
 using std::unique_ptr;
+using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
@@ -6376,5 +6382,125 @@ TEST_F(ClientWithLocationTest, LocationCacheMetricsOnClientConnectToCluster) {
   ASSERT_EQ(hits_before + kIterNum, hits_after);
 }
 
+// Regression test for KUDU-2980.
+TEST_F(ClientTest, TestProjectionPredicatesFuzz) {
+  const int kNumColumns = 20;
+  const int kNumPKColumns = kNumColumns / 2;
+  const char* const kTableName = "test";
+
+  // Create a schema with half of the columns in the primary key.
+  //
+  // Use a smattering of different physical data types to increase the
+  // likelihood of a size transition between primary key components.
+  KuduSchemaBuilder b;
+  vector<string> pk_col_names;
+  vector<string> all_col_names;
+  KuduColumnSchema::DataType data_type = KuduColumnSchema::STRING;
+  for (int i = 0; i < kNumColumns; i++) {
+    string col_name = std::to_string(i);
+    b.AddColumn(col_name)->Type(data_type)->NotNull();
+    if (i < kNumPKColumns) {
+      pk_col_names.emplace_back(col_name);
+    }
+    all_col_names.emplace_back(std::move(col_name));
+
+    // Rotate to next data type.
+    switch (data_type) {
+      case KuduColumnSchema::INT8: data_type = KuduColumnSchema::INT16; break;
+      case KuduColumnSchema::INT16: data_type = KuduColumnSchema::INT32; break;
+      case KuduColumnSchema::INT32: data_type = KuduColumnSchema::INT64; break;
+      case KuduColumnSchema::INT64: data_type = KuduColumnSchema::STRING; break;
+      case KuduColumnSchema::STRING: data_type = KuduColumnSchema::INT8; break;
+      default: LOG(FATAL) << "Unexpected data type " << data_type;
+    }
+  }
+  b.SetPrimaryKey(pk_col_names);
+  KuduSchema schema;
+  ASSERT_OK(b.Build(&schema));
+
+  // Create and open the table. We don't care about replication or partitioning.
+  unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
+  ASSERT_OK(table_creator->table_name(kTableName)
+                          .schema(&schema)
+                          .set_range_partition_columns({})
+                          .num_replicas(1)
+                          .Create());
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(kTableName, &table));
+
+  unique_ptr<KuduScanner> scanner;
+  scanner.reset(new KuduScanner(table.get()));
+
+  // Pick a random selection of columns to project.
+  //
+  // Done before inserting data so the projection can be used to determine the
+  // expected scan results.
+  Random rng(SeedRandom());
+  vector<string> projected_col_names =
+      SelectRandomSubset<vector<string>, string, Random>(all_col_names, 0, &rng);
+  std::random_shuffle(projected_col_names.begin(), projected_col_names.end());
+  ASSERT_OK(scanner->SetProjectedColumnNames(projected_col_names));
+
+  // Insert some rows with randomized keys, flushing the tablet periodically.
+  const int kNumRows = 20;
+  shared_ptr<KuduSession> session = client_->NewSession();
+  vector<string> expected_rows;
+  for (int i = 0; i < kNumRows; i++) {
+    unique_ptr<KuduInsert> insert(table->NewInsert());
+    KuduPartialRow* row = insert->mutable_row();
+    GenerateDataForRow(schema, &rng, row);
+
+    // Store a copy of the row for later, to compare with the scan results.
+    //
+    // The copy should look like KuduScanBatch::RowPtr::ToString() and must
+    // conform to the projection schema.
+    ConstContiguousRow ccr(row->schema(), row->row_data_);
+    string row_str = "(";
+    row_str += JoinMapped(projected_col_names, [&](const string& col_name) {
+        int col_idx = row->schema()->find_column(col_name);
+        const auto& col = row->schema()->column(col_idx);
+        DCHECK_NE(Schema::kColumnNotFound, col_idx);
+        string cell;
+        col.DebugCellAppend(ccr.cell(col_idx), &cell);
+        return cell;
+      }, ", ");
+    row_str += ")";
+    expected_rows.emplace_back(std::move(row_str));
+
+    ASSERT_OK(session->Apply(insert.release()));
+
+    // Leave one row in the tablet's MRS so that the scan includes one rowset
+    // without bounds. This affects the behavior of FT scans.
+    if (i < kNumRows - 1 && rng.OneIn(2)) {
+      NO_FATALS(FlushTablet(GetFirstTabletId(table.get())));
+    }
+  }
+
+  // Pick a random selection of columns for predicates.
+  //
+  // We use NOT NULL predicates so as to tickle the server-side code for dealing
+  // with predicates without actually affecting the scan results.
+  vector<string> predicate_col_names =
+      SelectRandomSubset<vector<string>, string, Random>(all_col_names, 0, &rng);
+  for (const auto& col_name : predicate_col_names) {
+    unique_ptr<KuduPredicate> pred(table->NewIsNotNullPredicate(col_name));
+    ASSERT_OK(scanner->AddConjunctPredicate(pred.release()));
+  }
+
+  // Use a fault tolerant scan half the time.
+  if (rng.OneIn(2)) {
+    ASSERT_OK(scanner->SetFaultTolerant());
+  }
+
+  // Perform the scan and verify the results.
+  //
+  // We ignore result ordering because although FT scans will produce rows
+  // sorted by primary keys, regular scans will not.
+  vector<string> rows;
+  ASSERT_OK(ScanToStrings(scanner.get(), &rows));
+  ASSERT_EQ(unordered_set<string>(expected_rows.begin(), expected_rows.end()),
+            unordered_set<string>(rows.begin(), rows.end())) << rows;
+}
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/common/generic_iterators.cc b/src/kudu/common/generic_iterators.cc
index 3c1c987..b865f79 100644
--- a/src/kudu/common/generic_iterators.cc
+++ b/src/kudu/common/generic_iterators.cc
@@ -26,6 +26,7 @@
 #include <iterator>
 #include <memory>
 #include <mutex>
+#include <numeric>
 #include <ostream>
 #include <string>
 #include <unordered_map>
@@ -1167,12 +1168,13 @@ MaterializingIterator::MaterializingIterator(unique_ptr<ColumnwiseIterator> iter
 Status MaterializingIterator::Init(ScanSpec *spec) {
   RETURN_NOT_OK(iter_->Init(spec));
 
-  int32_t num_columns = schema().num_columns();
+  const int32_t num_columns = schema().num_columns();
   col_idx_predicates_.clear();
   non_predicate_column_indexes_.clear();
 
-  if (spec != nullptr && !disallow_pushdown_for_tests_) {
+  if (PREDICT_FALSE(!disallow_pushdown_for_tests_) && spec != nullptr) {
     col_idx_predicates_.reserve(spec->predicates().size());
+    DCHECK_GE(num_columns, spec->predicates().size());
     non_predicate_column_indexes_.reserve(num_columns - spec->predicates().size());
 
     for (const auto& col_pred : spec->predicates()) {
@@ -1185,7 +1187,7 @@ Status MaterializingIterator::Init(ScanSpec *spec) {
       col_idx_predicates_.emplace_back(col_idx, col_pred.second);
     }
 
-    for (int32_t col_idx = 0; col_idx < schema().num_columns(); col_idx++) {
+    for (int32_t col_idx = 0; col_idx < num_columns; col_idx++) {
       if (!ContainsKey(spec->predicates(), schema().column(col_idx).name())) {
         non_predicate_column_indexes_.emplace_back(col_idx);
       }
@@ -1195,10 +1197,9 @@ Status MaterializingIterator::Init(ScanSpec *spec) {
     // scan spec so higher layers don't repeat our work.
     spec->RemovePredicates();
   } else {
-    non_predicate_column_indexes_.reserve(num_columns);
-    for (int32_t col_idx = 0; col_idx < num_columns; col_idx++) {
-      non_predicate_column_indexes_.emplace_back(col_idx);
-    }
+    non_predicate_column_indexes_.resize(num_columns);
+    std::iota(non_predicate_column_indexes_.begin(),
+              non_predicate_column_indexes_.end(), 0);
   }
 
   // Sort the predicates by selectivity so that the most selective are evaluated
diff --git a/src/kudu/common/partial_row.h b/src/kudu/common/partial_row.h
index ac2816b..3a50907 100644
--- a/src/kudu/common/partial_row.h
+++ b/src/kudu/common/partial_row.h
@@ -42,6 +42,7 @@
 namespace kudu {
 class ColumnSchema;
 namespace client {
+class ClientTest_TestProjectionPredicatesFuzz_Test;
 class KuduWriteOperation;
 template<typename KeyTypeWrapper> struct SliceKeysTestSetup;// IWYU pragma: keep
 template<typename KeyTypeWrapper> struct IntKeysTestSetup;  // IWYU pragma: keep
@@ -595,6 +596,7 @@ class KUDU_EXPORT KuduPartialRow {
   template<typename KeyTypeWrapper> friend struct client::IntKeysTestSetup;
   template<typename KeyTypeWrapper> friend struct tablet::SliceTypeRowOps;
   template<typename KeyTypeWrapper> friend struct tablet::NumTypeRowOps;
+  FRIEND_TEST(client::ClientTest, TestProjectionPredicatesFuzz);
   FRIEND_TEST(KeyUtilTest, TestIncrementInt128PrimaryKey);
   FRIEND_TEST(PartitionPrunerTest, TestIntPartialPrimaryKeyRangePruning);
   FRIEND_TEST(PartitionPrunerTest, TestPartialPrimaryKeyRangePruning);
diff --git a/src/kudu/integration-tests/data_gen_util.cc b/src/kudu/integration-tests/data_gen_util.cc
index 7a46ed2..8f5d886 100644
--- a/src/kudu/integration-tests/data_gen_util.cc
+++ b/src/kudu/integration-tests/data_gen_util.cc
@@ -90,6 +90,12 @@ void GenerateDataForRow(const client::KuduSchema& schema, uint64_t record_id,
   }
 }
 
+template <class RNG>
+void GenerateDataForRow(const client::KuduSchema& schema,
+                        RNG* random, KuduPartialRow* row) {
+  GenerateDataForRow(schema, random->Next64(), random, row);
+}
+
 // Explicit specialization for callers outside this compilation unit.
 template
 void GenerateDataForRow(const client::KuduSchema& schema, uint64_t record_id,
@@ -97,5 +103,11 @@ void GenerateDataForRow(const client::KuduSchema& schema, uint64_t record_id,
 template
 void GenerateDataForRow(const client::KuduSchema& schema, uint64_t record_id,
                         ThreadSafeRandom* random, KuduPartialRow* row);
+template
+void GenerateDataForRow(const client::KuduSchema& schema,
+                        Random* random, KuduPartialRow* row);
+template
+void GenerateDataForRow(const client::KuduSchema& schema,
+                        ThreadSafeRandom* random, KuduPartialRow* row);
 
 } // namespace kudu
diff --git a/src/kudu/integration-tests/data_gen_util.h b/src/kudu/integration-tests/data_gen_util.h
index c7ff838..1130f8a 100644
--- a/src/kudu/integration-tests/data_gen_util.h
+++ b/src/kudu/integration-tests/data_gen_util.h
@@ -44,4 +44,9 @@ template<class RNG>
 void GenerateDataForRow(const client::KuduSchema& schema, uint64_t record_id,
                         RNG* random, KuduPartialRow* row);
 
+// Like the above but randomly generates the entire key.
+template<class RNG>
+void GenerateDataForRow(const client::KuduSchema& schema,
+                        RNG* random, KuduPartialRow* row);
+
 } // namespace kudu
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index cbeae9b..c0277af 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -55,6 +55,7 @@
 #include "kudu/consensus/raft_consensus.h"
 #include "kudu/consensus/replica_management.pb.h"
 #include "kudu/consensus/time_manager.h"
+#include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/casts.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/macros.h"
@@ -2177,19 +2178,6 @@ static Status SetupScanSpec(const NewScanRequestPB& scan_pb,
     }
   }
 
-  // When doing an ordered scan, we need to include the key columns to be able to encode
-  // the last row key for the scan response.
-  if (scan_pb.order_mode() == kudu::ORDERED &&
-      projection.num_key_columns() != tablet_schema.num_key_columns()) {
-    for (int i = 0; i < tablet_schema.num_key_columns(); i++) {
-      const ColumnSchema &col = tablet_schema.column(i);
-      if (projection.find_column(col.name()) == -1 &&
-          !ContainsKey(missing_col_names, col.name())) {
-        missing_cols->push_back(col);
-        InsertOrDie(&missing_col_names, col.name());
-      }
-    }
-  }
   // Then any encoded key range predicates.
   RETURN_NOT_OK(DecodeEncodedKeyRange(scan_pb, tablet_schema, scanner, ret.get()));
 
@@ -2308,9 +2296,9 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
 
   gscoped_ptr<ScanSpec> spec(new ScanSpec);
 
-  // Missing columns will contain the columns that are not mentioned in the client
-  // projection but are actually needed for the scan, such as columns referred to by
-  // predicates or key columns (if this is an ORDERED scan).
+  // Missing columns will contain the columns that are not mentioned in the
+  // client projection but are actually needed for the scan, such as columns
+  // referred to by predicates.
   vector<ColumnSchema> missing_cols;
   s = SetupScanSpec(scan_pb, tablet_schema, projection, &missing_cols, &spec, scanner);
   if (PREDICT_FALSE(!s.ok())) {
@@ -2332,15 +2320,43 @@ Status TabletServiceImpl::HandleNewScanRequest(TabletReplica* replica,
   gscoped_ptr<Schema> orig_projection(new Schema(projection));
   scanner->set_client_projection_schema(std::move(orig_projection));
 
-  // Build a new projection with the projection columns and the missing columns. Make
-  // sure to set whether the column is a key column appropriately.
+  // Build a new projection with the projection columns and the missing columns,
+  // annotating each column as a key column appropriately.
+  //
+  // Note: the projection is a consistent schema (i.e. no duplicate columns).
+  // However, it has some different semantics as compared to the tablet schema:
+  // - It might not contain all of the columns in the tablet.
+  // - It might contain extra columns not found in the tablet schema. Virtual
+  //   columns are permitted, while others will cause the scan to fail later,
+  //   when the tablet validates the projection.
+  // - It doesn't know which of its columns are key columns. That's fine for
+  //   an UNORDERED scan, but we'll need to fix this for an ORDERED scan, which
+  //   requires all key columns in tablet schema order.
   SchemaBuilder projection_builder;
-  vector<ColumnSchema> projection_columns = projection.columns();
-  for (const ColumnSchema& col : missing_cols) {
-    projection_columns.push_back(col);
-  }
-  for (const ColumnSchema& col : projection_columns) {
-    CHECK_OK(projection_builder.AddColumn(col, tablet_schema.is_key_column(col.name())));
+  if (scan_pb.order_mode() == ORDERED) {
+    for (int i = 0; i < tablet_schema.num_key_columns(); i++) {
+      const auto& col = tablet_schema.column(i);
+      // CHECK_OK is safe because the tablet schema has no duplicate columns.
+      CHECK_OK(projection_builder.AddColumn(col, /* is_key= */ true));
+    }
+    for (int i = 0; i < projection.num_columns(); i++) {
+      const auto& col = projection.column(i);
+      // Any key columns in the projection will be ignored.
+      ignore_result(projection_builder.AddColumn(col, /* is_key= */ false));
+    }
+    for (const ColumnSchema& col : missing_cols) {
+      // Any key columns in 'missing_cols' will be ignored.
+      ignore_result(projection_builder.AddColumn(col, /* is_key= */ false));
+    }
+  } else {
+    projection_builder.Reset(projection);
+    for (const ColumnSchema& col : missing_cols) {
+      // CHECK_OK is safe because the builder's columns (from the projection)
+      // and the missing columns are disjoint sets.
+      //
+      // UNORDERED scans don't need to know which columns are part of the key.
+      CHECK_OK(projection_builder.AddColumn(col, /* is_key= */ false));
+    }
   }
   projection = projection_builder.BuildWithoutIds();
 
diff --git a/src/kudu/util/random_util.h b/src/kudu/util/random_util.h
index a607051..73eafd0 100644
--- a/src/kudu/util/random_util.h
+++ b/src/kudu/util/random_util.h
@@ -49,7 +49,8 @@ T SelectRandomElement(const Container& c, Rand* r) {
   return rand_list[0];
 }
 
-// Returns a randomly-selected subset from the container.
+// Returns a randomly-selected subset from the container. The subset will
+// include at least 'min_to_return' results, but may contain more.
 //
 // The results are not stored in a randomized order: the order of results will
 // match their order in the input collection.