You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/06/08 00:23:29 UTC

[kudu] 05/05: KUDU-2809 (5/6): add diff scan support to fuzz-itest

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 9dfe8f8e671b436996867083a4e9d82df5e1997f
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Jun 5 17:51:13 2019 -0700

    KUDU-2809 (5/6): add diff scan support to fuzz-itest
    
    This patch adds diff scans to the set of operations supported by fuzz-itest.
    The existing saved_values_ map is quite difficult to reuse for verifying
    diff scan results, so instead I added a saved_redos_ map that tracks every
    insertion/mutation as a discrete "redo".
    
    For coverage, there are two new tests for KUDU-2809. I also ran fuzz-itest
    in slow mode a few thousands times on this patch series without failure.
    
    Change-Id: I3f7dae20ef1b903dba80e90d5f491e4322815fbb
    Reviewed-on: http://gerrit.cloudera.org:8080/13537
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/integration-tests/fuzz-itest.cc | 276 +++++++++++++++++++++++++++++--
 src/kudu/tablet/key_value_test_schema.h  |   4 +
 2 files changed, 262 insertions(+), 18 deletions(-)

diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc
index dbc0ee4..2ee1a24 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
 #include <cstdint>
 #include <cstdlib>
 #include <functional>
@@ -31,6 +32,7 @@
 #include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
+#include <glog/stl_logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/client/client-test-util.h"
@@ -115,6 +117,7 @@ enum TestOpType {
   TEST_COMPACT_TABLET,
   TEST_RESTART_TS,
   TEST_SCAN_AT_TIMESTAMP,
+  TEST_DIFF_SCAN,
   TEST_NUM_OP_TYPES // max value for enum
 };
 
@@ -133,24 +136,31 @@ const char* TestOpType_names[] = {
   "TEST_MAJOR_COMPACT_DELTAS",
   "TEST_COMPACT_TABLET",
   "TEST_RESTART_TS",
-  "TEST_SCAN_AT_TIMESTAMP"
+  "TEST_SCAN_AT_TIMESTAMP",
+  "TEST_DIFF_SCAN"
 };
 
 // An operation in a fuzz-test sequence.
 struct TestOp {
   // NOLINTNEXTLINE(google-explicit-constructor)
-  TestOp(TestOpType t, int v = 0) // NOLINT(runtime/explicit)
+  TestOp(TestOpType t, int v1 = 0, int v2 = 0) // NOLINT(runtime/explicit)
       : type(t),
-        val(v) {}
+        val(v1),
+        val2(v2) {}
 
   // The op to run.
   TestOpType type;
 
   // For INSERT/UPSERT/UPDATE/DELETE, the key of the row to be modified.
   // For SCAN_AT_TIMESTAMP the timestamp of the scan.
+  // For DIFF_SCAN the start timestamp of the scan.
   // Otherwise, unused.
   int val;
 
+  // For DIFF_SCAN, the end timestamp of the scan.
+  // Otherwise, unused.
+  int val2;
+
   string ToString() const {
     switch (type) {
       case TEST_FLUSH_OPS:
@@ -169,6 +179,8 @@ struct TestOp {
       case TEST_DELETE:
       case TEST_SCAN_AT_TIMESTAMP:
         return strings::Substitute("{$0, $1}", TestOpType_names[type], val);
+      case TEST_DIFF_SCAN:
+        return strings::Substitute("{$0, $1, $2}", TestOpType_names[type], val, val2);
       default:
         LOG(FATAL) << "Invalid op type: " << type;
     }
@@ -176,6 +188,31 @@ struct TestOp {
   }
 };
 
+enum RedoType {
+  INSERT,
+  UPDATE,
+  DELETE,
+};
+
+struct Redo {
+  Redo(RedoType t, int32_t k, optional<int32_t> v = boost::none)
+      : rtype(t),
+        key(k),
+        val(std::move(v)) {}
+
+  string ToString() const {
+    if (rtype == DELETE) {
+      return strings::Substitute("{DELETE key=$0}", key);
+    }
+    return strings::Substitute("{$0 key=$1 val=$2}",
+                               rtype == INSERT ? "INSERT" : "UPDATE", key,
+                               val ? std::to_string(*val) : "NULL");
+  }
+  RedoType rtype;
+  int32_t key;
+  optional<int32_t> val;
+};
+
 const vector<TestOpType> kAllOps {TEST_INSERT,
                                   TEST_INSERT_PK_ONLY,
                                   TEST_UPSERT,
@@ -189,7 +226,8 @@ const vector<TestOpType> kAllOps {TEST_INSERT,
                                   TEST_MAJOR_COMPACT_DELTAS,
                                   TEST_COMPACT_TABLET,
                                   TEST_RESTART_TS,
-                                  TEST_SCAN_AT_TIMESTAMP};
+                                  TEST_SCAN_AT_TIMESTAMP,
+                                  TEST_DIFF_SCAN};
 
 const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY,
                                      TEST_UPSERT_PK_ONLY,
@@ -201,7 +239,8 @@ const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY,
                                      TEST_MAJOR_COMPACT_DELTAS,
                                      TEST_COMPACT_TABLET,
                                      TEST_RESTART_TS,
-                                     TEST_SCAN_AT_TIMESTAMP};
+                                     TEST_SCAN_AT_TIMESTAMP,
+                                     TEST_DIFF_SCAN};
 
 // Test which does only random operations against a tablet, including update and random
 // get (ie scans with equal lower and upper bounds).
@@ -423,6 +462,33 @@ class FuzzTest : public KuduTest {
     }
   }
 
+  // Fully consume all rows in 'scanner', writing the results to 'rows'.
+  //
+  // If 'is_deleted' is provided (only in diff scans), will also write out the
+  // values of the IS_DELETED virtual column.
+  Status ScanAllRows(KuduScanner* scanner, vector<ExpectedKeyValueRow>* rows,
+                     vector<bool>* is_deleted) {
+    while (scanner->HasMoreRows()) {
+      KuduScanBatch batch;
+      RETURN_NOT_OK(scanner->NextBatch(&batch));
+      for (KuduScanBatch::RowPtr row : batch) {
+        ExpectedKeyValueRow ret;
+        RETURN_NOT_OK(row.GetInt32(0, &ret.key));
+        if (schema_.num_columns() == 2 && !row.IsNull(1)) {
+          ret.val = 0;
+          RETURN_NOT_OK(row.GetInt32(1, ret.val.get_ptr()));
+        }
+        if (is_deleted) {
+          bool b;
+          RETURN_NOT_OK(row.IsDeleted(&b));
+          is_deleted->push_back(b);
+        }
+        rows->emplace_back(std::move(ret));
+      }
+    }
+    return Status::OK();
+  }
+
   // Scan the tablet at 'timestamp' and compare the result to the saved values.
   void CheckScanAtTimestamp(int timestamp) {
     KuduScanner s(table_.get());
@@ -430,20 +496,9 @@ class FuzzTest : public KuduTest {
     ASSERT_OK(s.SetSnapshotRaw(timestamp));
     ASSERT_OK(s.SetFaultTolerant());
     ASSERT_OK(s.Open());
+
     vector<ExpectedKeyValueRow> found;
-    while (s.HasMoreRows()) {
-      KuduScanBatch batch;
-      ASSERT_OK(s.NextBatch(&batch));
-      for (KuduScanBatch::RowPtr row : batch) {
-        ExpectedKeyValueRow ret;
-        ASSERT_OK(row.GetInt32(0, &ret.key));
-        if (schema_.num_columns() > 1 && !row.IsNull(1)) {
-          ret.val = 0;
-          ASSERT_OK(row.GetInt32(1, ret.val.get_ptr()));
-        }
-        found.push_back(ret);
-      }
-    }
+    ASSERT_OK(ScanAllRows(&s, &found, nullptr));
 
     list<string> errors;
     CheckRowsMatchAtTimestamp(timestamp, std::move(found), &errors);
@@ -457,6 +512,133 @@ class FuzzTest : public KuduTest {
     }
   }
 
+  // Diff scan the tablet from 'start_timestamp' to 'end_timestamp' and compare
+  // the result to the saved values.
+  void CheckDiffScan(int start_timestamp, int end_timestamp) {
+    KuduScanner s(table_.get());
+    ASSERT_OK(s.SetDiffScan(start_timestamp, end_timestamp));
+    ASSERT_OK(s.Open());
+
+    vector<ExpectedKeyValueRow> found;
+    vector<bool> found_is_deleted;
+    ASSERT_OK(ScanAllRows(&s, &found, &found_is_deleted));
+
+    if (VLOG_IS_ON(1)) {
+      for (int i = 0; i < found.size(); i++) {
+        VLOG(1) << Substitute("Diff scan result: $0$1", found[i].ToString(),
+                              found_is_deleted[i] ? " (deleted)" : "");
+      }
+    }
+
+    // Use saved_redos_ to reconstruct the expected results of the diff scan.
+    //
+    // 'selected_rows' tracks which row keys are expected in the scan results
+    // using the select criteria.
+    //
+    // 'expected_rows' tracks expected values of rows and is built up using the
+    // apply criteria. After we've processed all relevant deltas, rows not in
+    // 'selected_rows' will be pruned and the results compared with the diff scan.
+    //
+    // 'is_deleted_start' and 'is_deleted_end' track liveness for each row at
+    // the beginning and end of the time range. If a row is dead in both, it
+    // shouldn't be in the diff scan results.
+    vector<bool> selected_rows(FLAGS_keyspace_size);
+    vector<ExpectedKeyValueRow> expected_rows(FLAGS_keyspace_size);
+    vector<bool> is_deleted_start(FLAGS_keyspace_size, true);
+    vector<bool> is_deleted_end(FLAGS_keyspace_size, true);
+
+    for (const auto& e : saved_redos_) {
+      int ts = e.first;
+      const auto& redos = e.second;
+      if (redos.empty()) {
+        continue;
+      }
+      VLOG(1) << "Processing redos for ts @" << ts;
+
+      if (ts >= end_timestamp) {
+        // The redo is beyond the end of the diff scan as per both the select
+        // and apply criteria. We're iterating in ascending timestamp order so
+        // this also means all future redos are irrelevant.
+        if (VLOG_IS_ON(1)) {
+          for (const auto& redo : redos) {
+            VLOG(1) << "Skipping redo " << redo.ToString();
+          }
+          continue;
+        }
+        break;
+      }
+
+      for (const auto& redo : redos) {
+        VLOG(1) << "Processing redo " << redo.ToString();
+        if (ts >= start_timestamp) {
+          // The redo is relevant as per the select criteria.
+          VLOG(1) << "Selected row " << redo.key;
+
+          if (!selected_rows[redo.key]) {
+            // This is the first relevant redo for this row.
+            is_deleted_start[redo.key] = redo.rtype == INSERT;
+            selected_rows[redo.key] = true;
+          }
+        }
+
+        // The redo is relevant as per the apply criteria.
+        is_deleted_end[redo.key] = redo.rtype == DELETE;
+        if (redo.rtype != DELETE) {
+          // Deleted rows still exist in 'expected_rows'. This is OK;
+          // 'expected_is_deleted' will reflect the deletion.
+          expected_rows[redo.key] = { redo.key, redo.val };
+        }
+        VLOG(1) << "New value for row " << redo.key << ": "
+                << expected_rows[redo.key].ToString();
+        VLOG(1) << "New is_deleted for row " << redo.key << ": "
+                << is_deleted_end[redo.key];
+      }
+    }
+    vector<bool> expected_is_deleted = is_deleted_end;
+
+    // Trim the expected results as per 'selected_rows' and start/end liveness.
+    int row_key = 0;
+    expected_rows.erase(std::remove_if(
+        expected_rows.begin(), expected_rows.end(),
+        [&](const ExpectedKeyValueRow& /*row*/) {
+          bool retval = !selected_rows[row_key] ||
+                        (is_deleted_start[row_key] && is_deleted_end[row_key]);
+          row_key++;
+          return retval;
+        }), expected_rows.end());
+    row_key = 0;
+    expected_is_deleted.erase(std::remove_if(
+        expected_is_deleted.begin(), expected_is_deleted.end(),
+        [&](bool /*is_deleted*/) {
+          bool retval = !selected_rows[row_key] ||
+                        (is_deleted_start[row_key] && is_deleted_end[row_key]);
+          row_key++;
+          return retval;
+        }), expected_is_deleted.end());
+
+    // Test the results. Note that for deleted rows, we can't compare column
+    // values; the server is free to pick whatever historical values it wants.
+    auto fail_diff_scan = [&]() {
+      FAIL() << "Diff scan verification failed\n"
+             << "Expected IS_DELETED: " << expected_is_deleted << "\n"
+             << "Found IS_DELETED: " << found_is_deleted << "\n"
+             << "Expected rows: " << expected_rows  << "\n"
+             << "Found rows: " << found;
+    };
+    if (expected_is_deleted != found_is_deleted) {
+      NO_FATALS(fail_diff_scan());
+    }
+    if (expected_rows.size() != found.size()) {
+      NO_FATALS(fail_diff_scan());
+    }
+    for (int i = 0; i < expected_rows.size(); i++) {
+      if ((expected_is_deleted[i] && expected_rows[i].key != found[i].key) ||
+          (!expected_is_deleted[i] && expected_rows[i] != found[i])) {
+        NO_FATALS(fail_diff_scan());
+      }
+    }
+  }
+
  protected:
   // Validate that the given sequence is valid and would not cause any
   // errors assuming that there are no bugs. For example, checks to make sure there
@@ -478,6 +660,8 @@ class FuzzTest : public KuduTest {
       vector<optional<ExpectedKeyValueRow>>,
       std::greater<int>> saved_values_;
 
+  map<int, vector<Redo>> saved_redos_;
+
   scoped_refptr<TabletReplica> tablet_replica_;
 };
 
@@ -630,6 +814,19 @@ void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
         ops->emplace_back(TEST_SCAN_AT_TIMESTAMP, timestamp);
         break;
       }
+      case TEST_DIFF_SCAN: {
+        int start_timestamp = 1;
+        int end_timestamp = 1;
+        if (op_timestamps > 0) {
+          start_timestamp = (rand() % op_timestamps) + 1;
+          end_timestamp = (rand() % op_timestamps) + 1;
+          if (start_timestamp > end_timestamp) {
+            std::swap(start_timestamp, end_timestamp);
+          }
+        }
+        ops->emplace_back(TEST_DIFF_SCAN, start_timestamp, end_timestamp);
+        break;
+      }
       default:
         LOG(FATAL) << "Invalid op type: " << r;
     }
@@ -680,6 +877,7 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
 
   vector<optional<ExpectedKeyValueRow>> cur_val(FLAGS_keyspace_size);
   vector<optional<ExpectedKeyValueRow>> pending_val(FLAGS_keyspace_size);
+  vector<Redo> pending_redos;
 
   int i = 0;
   for (const TestOp& test_op : test_ops) {
@@ -693,24 +891,36 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
       case TEST_INSERT_PK_ONLY:
       case TEST_UPSERT:
       case TEST_UPSERT_PK_ONLY: {
+        RedoType rtype = pending_val[test_op.val] ? UPDATE : INSERT;
         pending_val[test_op.val] = InsertOrUpsertRow(
             test_op.val, i++, pending_val[test_op.val], test_op.type);
+
+        // A PK-only UPSERT that is converted into an UPDATE will be dropped
+        // server-side. We must do the same.
+        if (test_op.type != TEST_UPSERT_PK_ONLY || rtype != UPDATE) {
+          pending_redos.emplace_back(rtype, test_op.val, pending_val[test_op.val]->val);
+        }
         break;
       }
       case TEST_UPDATE:
         for (int j = 0; j < update_multiplier; j++) {
           pending_val[test_op.val] = MutateRow(test_op.val, i++);
+          pending_redos.emplace_back(UPDATE, test_op.val, pending_val[test_op.val]->val);
         }
         break;
       case TEST_DELETE:
         pending_val[test_op.val] = DeleteRow(test_op.val);
+        pending_redos.emplace_back(DELETE, test_op.val, boost::none);
         break;
       case TEST_FLUSH_OPS: {
         FlushSessionOrDie(session_);
         cur_val = pending_val;
         int current_time = down_cast<kudu::clock::LogicalClock*>(
             tablet()->clock().get())->GetCurrentTime();
+        VLOG(1) << "Current time: " << current_time;
         saved_values_[current_time] = cur_val;
+        saved_redos_[current_time] = pending_redos;
+        pending_redos.clear();
         break;
       }
       case TEST_FLUSH_TABLET:
@@ -734,6 +944,9 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
       case TEST_SCAN_AT_TIMESTAMP:
         NO_FATALS(CheckScanAtTimestamp(test_op.val));
         break;
+      case TEST_DIFF_SCAN:
+        NO_FATALS(CheckDiffScan(test_op.val, test_op.val2));
+        break;
       default:
         LOG(FATAL) << test_op.type;
     }
@@ -1029,6 +1242,33 @@ TEST_F(FuzzTest, TestUpsert_PKOnlySchema) {
      });
 }
 
+// MRS test for KUDU-2809: a row that has been inserted and deleted within the
+// time range of a diff scan is excluded from the results.
+TEST_F(FuzzTest, TestDiffScanRowLifespanInOneScanMRS) {
+  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
+  RunFuzzCase({
+      {TEST_INSERT, 0},
+      {TEST_FLUSH_OPS},
+      {TEST_DELETE, 0},
+      {TEST_FLUSH_OPS},
+      {TEST_DIFF_SCAN, 4, 7}
+    });
+}
+
+// DRS test for KUDU-2809: a row that has been inserted and deleted within the
+// time range of a diff scan is excluded from the results.
+TEST_F(FuzzTest, TestDiffScanRowLifespanInOneScanDRS) {
+  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
+  RunFuzzCase({
+      {TEST_INSERT, 0},
+      {TEST_FLUSH_OPS},
+      {TEST_FLUSH_TABLET},
+      {TEST_DELETE, 0},
+      {TEST_FLUSH_OPS},
+      {TEST_DIFF_SCAN, 4, 7}
+    });
+}
+
 } // namespace tablet
 } // namespace kudu
 
diff --git a/src/kudu/tablet/key_value_test_schema.h b/src/kudu/tablet/key_value_test_schema.h
index da36990..b8025eb 100644
--- a/src/kudu/tablet/key_value_test_schema.h
+++ b/src/kudu/tablet/key_value_test_schema.h
@@ -42,6 +42,10 @@ struct ExpectedKeyValueRow {
     return key == other.key && val == other.val;
   }
 
+  bool operator!=(const ExpectedKeyValueRow& other) const {
+    return !(*this == other);
+  }
+
   std::string ToString() const {
     std::string ret = strings::Substitute("{$0,", key);
     if (val == boost::none) {