You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2022/07/11 15:10:46 UTC
[doris] branch master updated: [feature-wip](unique-key-merge-on-write) Add rowset tree, based on interval-tree, DSIP-018[3/3] (#10714)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3b9cb524bc [feature-wip](unique-key-merge-on-write) Add rowset tree, based on interval-tree, DSIP-018[3/3] (#10714)
3b9cb524bc is described below
commit 3b9cb524bc17bf904489a81f6d96e613cea0d54e
Author: zhannngchen <48...@users.noreply.github.com>
AuthorDate: Mon Jul 11 23:10:38 2022 +0800
[feature-wip](unique-key-merge-on-write) Add rowset tree, based on interval-tree, DSIP-018[3/3] (#10714)
* port from rowset-tree from kudu
* use shared_ptr
* some update
* add mock rowset
* some compatibility update
* fix ut fail
* reformat code
---
be/src/olap/olap_common.h | 11 +
be/src/olap/rowset/CMakeLists.txt | 3 +-
be/src/olap/rowset/rowset_tree.cpp | 269 +++++++++++++++++++
be/src/olap/rowset/rowset_tree.h | 138 ++++++++++
be/test/CMakeLists.txt | 1 +
be/test/olap/rowset/rowset_tree_test.cpp | 434 +++++++++++++++++++++++++++++++
be/test/testutil/mock_rowset.h | 96 +++++++
7 files changed, 951 insertions(+), 1 deletion(-)
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 1334957dc5..92a1d6f241 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -414,4 +414,15 @@ struct RowsetId {
}
};
+// used for hash-struct of hash_map<RowsetId, Rowset*>.
+struct HashOfRowsetId {
+ size_t operator()(const RowsetId& rowset_id) const {
+ size_t seed = 0;
+ seed = HashUtil::hash64(&rowset_id.hi, sizeof(rowset_id.hi), seed);
+ seed = HashUtil::hash64(&rowset_id.mi, sizeof(rowset_id.mi), seed);
+ seed = HashUtil::hash64(&rowset_id.lo, sizeof(rowset_id.lo), seed);
+ return seed;
+ }
+};
+
} // namespace doris
diff --git a/be/src/olap/rowset/CMakeLists.txt b/be/src/olap/rowset/CMakeLists.txt
index 131978a9c4..cd5a61636d 100644
--- a/be/src/olap/rowset/CMakeLists.txt
+++ b/be/src/olap/rowset/CMakeLists.txt
@@ -29,6 +29,7 @@ add_library(Rowset STATIC
alpha_rowset_meta.cpp
beta_rowset.cpp
beta_rowset_reader.cpp
- beta_rowset_writer.cpp)
+ beta_rowset_writer.cpp
+ rowset_tree.cpp)
target_compile_options(Rowset PUBLIC)
diff --git a/be/src/olap/rowset/rowset_tree.cpp b/be/src/olap/rowset/rowset_tree.cpp
new file mode 100644
index 0000000000..d7cd879e28
--- /dev/null
+++ b/be/src/olap/rowset/rowset_tree.cpp
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This file is copied from
+// https://github.com/apache/kudu/blob/master/src/kudu/tablet/rowset_tree.cc
+// and modified by Doris
+
+#include "olap/rowset/rowset_tree.h"
+
+#include <glog/logging.h>
+
+#include <cstddef>
+#include <functional>
+#include <memory>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include "gutil/stl_util.h"
+#include "gutil/strings/substitute.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_meta.h"
+#include "util/interval_tree-inl.h"
+#include "util/slice.h"
+
+using std::shared_ptr;
+using std::string;
+using std::unique_ptr;
+using std::vector;
+
+namespace doris {
+
+namespace {
+
+// Lexicographic, first by slice, then by rowset pointer, then by segment id, then by start/stop
+bool RSEndpointBySliceCompare(const RowsetTree::RSEndpoint& a, const RowsetTree::RSEndpoint& b) {
+ int slice_cmp = a.slice_.compare(b.slice_);
+ if (slice_cmp) return slice_cmp < 0;
+ ptrdiff_t rs_cmp = a.rowset_.get() - b.rowset_.get();
+ if (rs_cmp) return rs_cmp < 0;
+ int seg_cmp = a.segment_id_ < b.segment_id_;
+ if (seg_cmp) return seg_cmp < 0;
+ if (a.endpoint_ != b.endpoint_) return a.endpoint_ == RowsetTree::START;
+ return false;
+}
+
+// Wrapper used when making batch queries into the interval tree.
+struct QueryStruct {
+ // The slice of the operation performing the query.
+ Slice slice;
+ // The original index of this slice in the incoming batch.
+ int idx;
+};
+
+} // anonymous namespace
+
+// Entry for use in the interval tree.
+struct RowsetWithBounds {
+ string min_key;
+ string max_key;
+
+ // NOTE: the ordering of struct fields here is purposeful: we access
+ // min_key and max_key frequently, so putting them first in the struct
+ // ensures they fill a single 64-byte cache line (each is 32 bytes).
+ // The 'rowset' pointer is accessed comparitively rarely.
+ RowsetSharedPtr rowset;
+ int32_t segment_id;
+};
+
+// Traits struct for IntervalTree.
+struct RowsetIntervalTraits {
+ typedef Slice point_type;
+ typedef RowsetWithBounds* interval_type;
+
+ static Slice get_left(const RowsetWithBounds* rs) { return Slice(rs->min_key); }
+
+ static Slice get_right(const RowsetWithBounds* rs) { return Slice(rs->max_key); }
+
+ static int compare(const Slice& a, const Slice& b) { return a.compare(b); }
+
+ static int compare(const Slice& a, const QueryStruct& b) { return a.compare(b.slice); }
+
+ static int compare(const QueryStruct& a, const Slice& b) { return -compare(b, a); }
+
+ // When 'a' is std::nullopt:
+ // (1)'a' is +OO when 'positive_direction' is true;
+ // (2)'a' is -OO when 'positive_direction' is false.
+ static int compare(const std::optional<Slice>& a, const Slice& b, const EndpointIfNone& type) {
+ if (a == std::nullopt) {
+ return ((POSITIVE_INFINITY == type) ? 1 : -1);
+ }
+
+ return compare(*a, b);
+ }
+
+ static int compare(const Slice& a, const std::optional<Slice>& b, const EndpointIfNone& type) {
+ return -compare(b, a, type);
+ }
+};
+
+RowsetTree::RowsetTree() : initted_(false) {}
+
+Status RowsetTree::Init(const RowsetVector& rowsets) {
+ if (initted_) {
+ return Status::InternalError("Call Init method on a RowsetTree that's already inited!");
+ }
+ std::vector<RowsetWithBounds*> entries;
+ ElementDeleter deleter(&entries);
+ entries.reserve(rowsets.size());
+ std::vector<RSEndpoint> endpoints;
+ endpoints.reserve(rowsets.size() * 2);
+
+ // Iterate over each of the provided Rowsets, fetching their
+ // bounds and adding them to the local vectors.
+ for (const RowsetSharedPtr& rs : rowsets) {
+ std::vector<KeyBoundsPB> segments_key_bounds;
+ Status s = rs->get_segments_key_bounds(&segments_key_bounds);
+ if (!s.ok()) {
+ LOG(WARNING) << "Unable to construct RowsetTree: " << rs->rowset_id()
+ << " unable to determine its bounds: " << s.to_string();
+ return s;
+ }
+ DCHECK_EQ(segments_key_bounds.size(), rs->num_segments());
+
+ for (auto i = 0; i < rs->num_segments(); i++) {
+ unique_ptr<RowsetWithBounds> rsit(new RowsetWithBounds());
+ rsit->rowset = rs;
+ rsit->segment_id = i;
+ string min_key = segments_key_bounds[i].min_key();
+ string max_key = segments_key_bounds[i].max_key();
+ DCHECK_LE(min_key.compare(max_key), 0)
+ << "Rowset min: " << min_key << " must be <= max: " << max_key;
+
+ // Load bounds and save entry
+ rsit->min_key = std::move(min_key);
+ rsit->max_key = std::move(max_key);
+
+ // Load into key endpoints.
+ endpoints.emplace_back(rsit->rowset, i, START, rsit->min_key);
+ endpoints.emplace_back(rsit->rowset, i, STOP, rsit->max_key);
+
+ entries.push_back(rsit.release());
+ }
+ }
+
+ // Sort endpoints
+ std::sort(endpoints.begin(), endpoints.end(), RSEndpointBySliceCompare);
+
+ // Install the vectors into the object.
+ entries_.swap(entries);
+ tree_.reset(new IntervalTree<RowsetIntervalTraits>(entries_));
+ key_endpoints_.swap(endpoints);
+ all_rowsets_.assign(rowsets.begin(), rowsets.end());
+
+ // Build the mapping from RS_ID to RS.
+ rs_by_id_.clear();
+ for (auto& rs : all_rowsets_) {
+ if (!rs_by_id_.insert({rs->rowset_id(), rs}).second) {
+ return Status::InternalError(strings::Substitute(
+ "Add rowset with $0 to rowset tree of tablet $1 failed",
+ rs->rowset_id().to_string(), rs->rowset_meta()->tablet_uid().to_string()));
+ }
+ }
+
+ initted_ = true;
+
+ return Status::OK();
+}
+
+void RowsetTree::FindRowsetsIntersectingInterval(
+ const std::optional<Slice>& lower_bound, const std::optional<Slice>& upper_bound,
+ vector<std::pair<RowsetSharedPtr, int32_t>>* rowsets) const {
+ DCHECK(initted_);
+
+ vector<RowsetWithBounds*> from_tree;
+ from_tree.reserve(all_rowsets_.size());
+ tree_->FindIntersectingInterval(lower_bound, upper_bound, &from_tree);
+ rowsets->reserve(rowsets->size() + from_tree.size());
+ for (RowsetWithBounds* rs : from_tree) {
+ rowsets->emplace_back(rs->rowset, rs->segment_id);
+ }
+}
+
+void RowsetTree::FindRowsetsWithKeyInRange(
+ const Slice& encoded_key, vector<std::pair<RowsetSharedPtr, int32_t>>* rowsets) const {
+ DCHECK(initted_);
+
+ // Query the interval tree to efficiently find rowsets with known bounds
+ // whose ranges overlap the probe key.
+ vector<RowsetWithBounds*> from_tree;
+ from_tree.reserve(all_rowsets_.size());
+ tree_->FindContainingPoint(encoded_key, &from_tree);
+ rowsets->reserve(rowsets->size() + from_tree.size());
+ for (RowsetWithBounds* rs : from_tree) {
+ rowsets->emplace_back(rs->rowset, rs->segment_id);
+ }
+}
+
+void RowsetTree::ForEachRowsetContainingKeys(
+ const std::vector<Slice>& encoded_keys,
+ const std::function<void(RowsetSharedPtr, int)>& cb) const {
+ DCHECK(std::is_sorted(encoded_keys.cbegin(), encoded_keys.cend(), Slice::Comparator()));
+ // The interval tree batch query callback would naturally just give us back
+ // the matching Slices, but that won't allow us to easily tell the caller
+ // which specific operation _index_ matched the Rowset. So, we make a vector
+ // of QueryStructs to pair the Slice with its original index.
+ vector<QueryStruct> queries;
+ queries.resize(encoded_keys.size());
+ for (int i = 0; i < encoded_keys.size(); i++) {
+ queries[i] = {encoded_keys[i], i};
+ }
+
+ tree_->ForEachIntervalContainingPoints(
+ queries, [&](const QueryStruct& qs, RowsetWithBounds* rs) { cb(rs->rowset, qs.idx); });
+}
+
+RowsetTree::~RowsetTree() {
+ for (RowsetWithBounds* e : entries_) {
+ delete e;
+ }
+ entries_.clear();
+}
+
+void ModifyRowSetTree(const RowsetTree& old_tree, const RowsetVector& rowsets_to_remove,
+ const RowsetVector& rowsets_to_add, RowsetTree* new_tree) {
+ RowsetVector post_swap;
+
+ // O(n^2) diff algorithm to collect the set of rowsets excluding
+ // the rowsets that were included in the compaction
+ int num_removed = 0;
+
+ for (const RowsetSharedPtr& rs : old_tree.all_rowsets()) {
+ // Determine if it should be removed
+ bool should_remove = false;
+ for (const RowsetSharedPtr& to_remove : rowsets_to_remove) {
+ if (to_remove->rowset_id() == rs->rowset_id()) {
+ should_remove = true;
+ num_removed++;
+ break;
+ }
+ }
+ if (!should_remove) {
+ post_swap.push_back(rs);
+ }
+ }
+
+ CHECK_EQ(num_removed, rowsets_to_remove.size());
+
+ // Then push the new rowsets on the end of the new list
+ std::copy(rowsets_to_add.begin(), rowsets_to_add.end(), std::back_inserter(post_swap));
+
+ CHECK(new_tree->Init(post_swap).ok());
+}
+
+} // namespace doris
diff --git a/be/src/olap/rowset/rowset_tree.h b/be/src/olap/rowset/rowset_tree.h
new file mode 100644
index 0000000000..92503dbf50
--- /dev/null
+++ b/be/src/olap/rowset/rowset_tree.h
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This file is copied from
+// https://github.com/apache/kudu/blob/master/src/kudu/tablet/rowset_tree.h
+// and modified by Doris
+
+#pragma once
+
+#include <boost/optional/optional.hpp>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include "common/status.h"
+#include "gutil/map-util.h"
+#include "olap/rowset/rowset.h"
+#include "util/slice.h"
+
+namespace doris {
+
+template <class Traits>
+class IntervalTree;
+
+struct RowsetIntervalTraits;
+struct RowsetWithBounds;
+
+// Used often enough, may as well typedef it.
+typedef std::vector<RowsetSharedPtr> RowsetVector;
+
+// Class which encapsulates the set of rowsets which are active for a given
+// Tablet. This provides efficient lookup by key for Rowsets which may overlap
+// that key range.
+//
+// Additionally, the rowset tree maintains information about the implicit
+// intervals generated by the row sets (for instance, if a tablet has
+// rowsets [0, 2] and [1, 3] it has three implicit contiguous intervals:
+// [0, 1], [1, 2], and [2, 3].
+class RowsetTree {
+public:
+ // An RSEndpoint is a POD which associates a rowset, an EndpointType
+ // (either the START or STOP of an interval), and the key at which the
+ // endpoint is located.
+ enum EndpointType { START, STOP };
+ struct RSEndpoint {
+ RSEndpoint(RowsetSharedPtr rowset, uint32_t segment_id, EndpointType endpoint, Slice slice)
+ : rowset_(rowset), segment_id_(segment_id), endpoint_(endpoint), slice_(slice) {}
+
+ RowsetSharedPtr rowset_;
+ uint32_t segment_id_;
+ enum EndpointType endpoint_;
+ Slice slice_;
+ };
+
+ RowsetTree();
+ Status Init(const RowsetVector& rowsets);
+ ~RowsetTree();
+
+ // Return all Rowsets whose range may contain the given encoded key.
+ //
+ // The returned pointers are guaranteed to be valid at least until this
+ // RowsetTree object is Reset().
+ void FindRowsetsWithKeyInRange(const Slice& encoded_key,
+ vector<std::pair<RowsetSharedPtr, int32_t>>* rowsets) const;
+
+ // Call 'cb(rowset, index)' for each (rowset, index) pair such that
+ // 'encoded_keys[index]' may be within the bounds of 'rowset'.
+ //
+ // See IntervalTree::ForEachIntervalContainingPoints for additional
+ // information on the particular order in which the callback will be called.
+ //
+ // REQUIRES: 'encoded_keys' must be in sorted order.
+ void ForEachRowsetContainingKeys(const std::vector<Slice>& encoded_keys,
+ const std::function<void(RowsetSharedPtr, int)>& cb) const;
+
+ // When 'lower_bound' is boost::none, it means negative infinity.
+ // When 'upper_bound' is boost::none, it means positive infinity.
+ // So the query interval can be one of below:
+ // [-OO, +OO)
+ // [-OO, upper_bound)
+ // [lower_bound, +OO)
+ // [lower_bound, upper_bound)
+ void FindRowsetsIntersectingInterval(
+ const std::optional<Slice>& lower_bound, const std::optional<Slice>& upper_bound,
+ vector<std::pair<RowsetSharedPtr, int32_t>>* rowsets) const;
+
+ const RowsetVector& all_rowsets() const { return all_rowsets_; }
+
+ RowsetSharedPtr rs_by_id(RowsetId rs_id) const { return FindPtrOrNull(rs_by_id_, rs_id); }
+
+ // Iterates over RowsetTree::RSEndpoint, guaranteed to be ordered and for
+ // any rowset to appear exactly twice, once at its start slice and once at
+ // its stop slice, equivalent to its GetBounds() values.
+ const std::vector<RSEndpoint>& key_endpoints() const { return key_endpoints_; }
+
+private:
+ // Interval tree of the rowsets. Used to efficiently find rowsets which might contain
+ // a probe row.
+ std::unique_ptr<IntervalTree<RowsetIntervalTraits>> tree_;
+
+ // Ordered map of all the interval endpoints, holding the implicit contiguous
+ // intervals
+ std::vector<RSEndpoint> key_endpoints_;
+
+ // Container for all of the entries in tree_. IntervalTree does
+ // not itself manage memory, so this provides a simple way to enumerate
+ // all the entry structs and free them in the destructor.
+ std::vector<RowsetWithBounds*> entries_;
+
+ // All of the rowsets which were put in this RowsetTree.
+ RowsetVector all_rowsets_;
+
+ // The Rowsets in this RowsetTree, keyed by their id.
+ std::unordered_map<RowsetId, RowsetSharedPtr, HashOfRowsetId> rs_by_id_;
+
+ bool initted_;
+};
+
+void ModifyRowSetTree(const RowsetTree& old_tree, const RowsetVector& rowsets_to_remove,
+ const RowsetVector& rowsets_to_add, RowsetTree* new_tree);
+
+} // namespace doris
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 1e2ffc8ade..9c38bb875e 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -180,6 +180,7 @@ set(OLAP_TEST_FILES
olap/rowset/rowset_meta_test.cpp
olap/rowset/beta_rowset_test.cpp
olap/rowset/unique_rowset_id_generator_test.cpp
+ olap/rowset/rowset_tree_test.cpp
# TODO yiguolei: it is using alpha rowset to test, should use beta rowset
#olap/txn_manager_test.cpp
olap/generic_iterators_test.cpp
diff --git a/be/test/olap/rowset/rowset_tree_test.cpp b/be/test/olap/rowset/rowset_tree_test.cpp
new file mode 100644
index 0000000000..0dd20a64fd
--- /dev/null
+++ b/be/test/olap/rowset/rowset_tree_test.cpp
@@ -0,0 +1,434 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// This file is copied from
+// https://github.com/apache/kudu/blob/master/src/kudu/tablet/rowset_tree-test.cc
+// and modified by Doris
+
+#include "olap/rowset/rowset_tree.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include <boost/optional/optional.hpp>
+#include <cstdlib>
+#include <cstring>
+#include <functional>
+#include <memory>
+#include <string>
+#include <tuple>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "gutil/map-util.h"
+#include "gutil/stringprintf.h"
+#include "gutil/strings/substitute.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/unique_rowset_id_generator.h"
+#include "testutil/mock_rowset.h"
+#include "testutil/test_util.h"
+#include "util/slice.h"
+#include "util/stopwatch.hpp"
+
+using std::make_shared;
+using std::shared_ptr;
+using std::string;
+using std::unordered_set;
+using std::vector;
+using strings::Substitute;
+
+namespace doris {
+
+class TestRowsetTree : public testing::Test {
+public:
+ TestRowsetTree() : rowset_id_generator_({0, 0}) {}
+
+ void SetUp() {
+ TabletSchemaPB schema_pb;
+ schema_pb.set_keys_type(UNIQUE_KEYS);
+ schema_.init_from_pb(schema_pb);
+ }
+
+ // Generates random rowsets with keys between 0 and 10000
+ RowsetVector GenerateRandomRowsets(int num_sets) {
+ RowsetVector vec;
+ for (int i = 0; i < num_sets; i++) {
+ int min = rand() % 9000;
+ int max = min + 1000;
+ vec.push_back(create_rowset(StringPrintf("%04d", min), StringPrintf("%04d", max)));
+ }
+ return vec;
+ }
+
+ RowsetSharedPtr create_rowset(const string& min_key, const string& max_key,
+ bool is_mem_rowset = false) {
+ RowsetMetaPB rs_meta_pb;
+ rs_meta_pb.set_rowset_id_v2(rowset_id_generator_.next_id().to_string());
+ rs_meta_pb.set_num_segments(1);
+ KeyBoundsPB key_bounds;
+ key_bounds.set_min_key(min_key);
+ key_bounds.set_max_key(max_key);
+ KeyBoundsPB* new_key_bounds = rs_meta_pb.add_segments_key_bounds();
+ *new_key_bounds = key_bounds;
+ RowsetMetaSharedPtr meta_ptr = make_shared<RowsetMeta>();
+ meta_ptr->init_from_pb(rs_meta_pb);
+ RowsetSharedPtr res_ptr;
+ MockRowset::create_rowset(&schema_, rowset_path_, meta_ptr, &res_ptr, is_mem_rowset);
+ return res_ptr;
+ }
+
+private:
+ TabletSchema schema_;
+ std::string rowset_path_;
+ UniqueRowsetIdGenerator rowset_id_generator_;
+};
+
+TEST_F(TestRowsetTree, TestTree) {
+ RowsetVector vec;
+ vec.push_back(create_rowset("0", "5"));
+ vec.push_back(create_rowset("3", "5"));
+ vec.push_back(create_rowset("5", "9"));
+ vec.push_back(create_rowset("0", "0", true));
+
+ RowsetTree tree;
+ ASSERT_FALSE(tree.Init(vec).ok());
+
+ vec.erase(vec.begin() + 3);
+ ASSERT_TRUE(tree.Init(vec).ok());
+
+ // "2" overlaps 0-5
+ vector<std::pair<RowsetSharedPtr, int32_t>> out;
+ tree.FindRowsetsWithKeyInRange("2", &out);
+ ASSERT_EQ(1, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+
+ // "4" overlaps 0-5, 3-5
+ out.clear();
+ tree.FindRowsetsWithKeyInRange("4", &out);
+ ASSERT_EQ(2, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+ ASSERT_EQ(vec[1].get(), out[1].first.get());
+
+ // interval [3,4) overlaps 0-5, 3-5
+ out.clear();
+ tree.FindRowsetsIntersectingInterval(Slice("3"), Slice("4"), &out);
+ ASSERT_EQ(2, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+ ASSERT_EQ(vec[1].get(), out[1].first.get());
+
+ // interval [0,2) overlaps 0-5
+ out.clear();
+ tree.FindRowsetsIntersectingInterval(Slice("0"), Slice("2"), &out);
+ ASSERT_EQ(1, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+
+ // interval [5,7) overlaps 0-5, 3-5, 5-9
+ out.clear();
+ tree.FindRowsetsIntersectingInterval(Slice("5"), Slice("7"), &out);
+ ASSERT_EQ(3, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+ ASSERT_EQ(vec[1].get(), out[1].first.get());
+ ASSERT_EQ(vec[2].get(), out[2].first.get());
+
+ // "3" overlaps 0-5, 3-5
+ out.clear();
+ tree.FindRowsetsWithKeyInRange("3", &out);
+ ASSERT_EQ(2, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+ ASSERT_EQ(vec[1].get(), out[1].first.get());
+
+ // "5" overlaps 0-5, 3-5, 5-9
+ out.clear();
+ tree.FindRowsetsWithKeyInRange("5", &out);
+ ASSERT_EQ(3, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+ ASSERT_EQ(vec[1].get(), out[1].first.get());
+ ASSERT_EQ(vec[2].get(), out[2].first.get());
+
+ // interval [0,5) overlaps 0-5, 3-5
+ out.clear();
+ tree.FindRowsetsIntersectingInterval(Slice("0"), Slice("5"), &out);
+ ASSERT_EQ(2, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+ ASSERT_EQ(vec[1].get(), out[1].first.get());
+
+ // interval [3,5) overlaps 0-5, 3-5
+ out.clear();
+ tree.FindRowsetsIntersectingInterval(Slice("3"), Slice("5"), &out);
+ ASSERT_EQ(2, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+ ASSERT_EQ(vec[1].get(), out[1].first.get());
+
+ // interval [-OO,3) overlaps 0-5
+ out.clear();
+ tree.FindRowsetsIntersectingInterval(std::nullopt, Slice("3"), &out);
+ ASSERT_EQ(1, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+
+ // interval [-OO,5) overlaps 0-5, 3-5
+ out.clear();
+ tree.FindRowsetsIntersectingInterval(std::nullopt, Slice("5"), &out);
+ ASSERT_EQ(2, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+ ASSERT_EQ(vec[1].get(), out[1].first.get());
+
+ // interval [-OO,99) overlaps 0-5, 3-5, 5-9
+ out.clear();
+ tree.FindRowsetsIntersectingInterval(std::nullopt, Slice("99"), &out);
+ ASSERT_EQ(3, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+ ASSERT_EQ(vec[1].get(), out[1].first.get());
+ ASSERT_EQ(vec[2].get(), out[2].first.get());
+
+ // interval [6,+OO) overlaps 5-9
+ out.clear();
+ tree.FindRowsetsIntersectingInterval(Slice("6"), std::nullopt, &out);
+ ASSERT_EQ(1, out.size());
+ ASSERT_EQ(vec[2].get(), out[0].first.get());
+
+ // interval [5,+OO) overlaps 0-5, 3-5, 5-9
+ out.clear();
+ tree.FindRowsetsIntersectingInterval(Slice("5"), std::nullopt, &out);
+ ASSERT_EQ(3, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+ ASSERT_EQ(vec[1].get(), out[1].first.get());
+ ASSERT_EQ(vec[2].get(), out[2].first.get());
+
+ // interval [4,+OO) overlaps 0-5, 3-5, 5-9
+ out.clear();
+ tree.FindRowsetsIntersectingInterval(Slice("4"), std::nullopt, &out);
+ ASSERT_EQ(3, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+ ASSERT_EQ(vec[1].get(), out[1].first.get());
+ ASSERT_EQ(vec[2].get(), out[2].first.get());
+
+ // interval [-OO,+OO) overlaps 0-5, 3-5, 5-9
+ out.clear();
+ tree.FindRowsetsIntersectingInterval(std::nullopt, std::nullopt, &out);
+ ASSERT_EQ(3, out.size());
+ ASSERT_EQ(vec[0].get(), out[0].first.get());
+ ASSERT_EQ(vec[1].get(), out[1].first.get());
+ ASSERT_EQ(vec[2].get(), out[2].first.get());
+}
+
+TEST_F(TestRowsetTree, TestTreeRandomized) {
+ enum BoundOperator {
+ BOUND_LESS_THAN,
+ BOUND_LESS_EQUAL,
+ BOUND_GREATER_THAN,
+ BOUND_GREATER_EQUAL,
+ BOUND_EQUAL
+ };
+ const auto& GetStringPair = [](const BoundOperator op, int start, int range_length) {
+ while (true) {
+ string s1 = Substitute("$0", rand_rng_int(start, start + range_length));
+ string s2 = Substitute("$0", rand_rng_int(start, start + range_length));
+ int r = strcmp(s1.c_str(), s2.c_str());
+ switch (op) {
+ case BOUND_LESS_THAN:
+ if (r == 0) continue; // pass through.
+ case BOUND_LESS_EQUAL:
+ return std::pair<string, string>(std::min(s1, s2), std::max(s1, s2));
+ case BOUND_GREATER_THAN:
+ if (r == 0) continue; // pass through.
+ case BOUND_GREATER_EQUAL:
+ return std::pair<string, string>(std::max(s1, s2), std::min(s1, s2));
+ case BOUND_EQUAL:
+ return std::pair<string, string>(s1, s1);
+ }
+ }
+ };
+
+ RowsetVector vec;
+ for (int i = 0; i < 100; i++) {
+ std::pair<string, string> bound = GetStringPair(BOUND_LESS_EQUAL, 1000, 900);
+ ASSERT_LE(bound.first, bound.second);
+ vec.push_back(shared_ptr<Rowset>(create_rowset(bound.first, bound.second)));
+ }
+ RowsetTree tree;
+ ASSERT_TRUE(tree.Init(vec).ok());
+
+ // When lower < upper.
+ vector<std::pair<RowsetSharedPtr, int32_t>> out;
+ for (int i = 0; i < 100; i++) {
+ out.clear();
+ std::pair<string, string> bound = GetStringPair(BOUND_LESS_THAN, 1000, 900);
+ ASSERT_LT(bound.first, bound.second);
+ tree.FindRowsetsIntersectingInterval(Slice(bound.first), Slice(bound.second), &out);
+ for (const auto& e : out) {
+ std::vector<KeyBoundsPB> segments_key_bounds;
+ e.first->get_segments_key_bounds(&segments_key_bounds);
+ ASSERT_EQ(1, segments_key_bounds.size());
+ string min = segments_key_bounds[0].min_key();
+ string max = segments_key_bounds[0].max_key();
+ if (min < bound.first) {
+ ASSERT_GE(max, bound.first);
+ } else {
+ ASSERT_LT(min, bound.second);
+ }
+ if (max >= bound.second) {
+ ASSERT_LT(min, bound.second);
+ } else {
+ ASSERT_GE(max, bound.first);
+ }
+ }
+ }
+
+ // Remove 50 rowsets, add 10 new rowsets, with non overlapping key range.
+ RowsetVector vec_to_del(vec.begin(), vec.begin() + 50);
+ RowsetVector vec_to_add;
+ for (int i = 0; i < 10; i++) {
+ std::pair<string, string> bound = GetStringPair(BOUND_LESS_EQUAL, 2000, 900);
+ ASSERT_LE(bound.first, bound.second);
+ vec_to_add.push_back(shared_ptr<Rowset>(create_rowset(bound.first, bound.second)));
+ }
+
+ RowsetTree new_tree;
+ ModifyRowSetTree(tree, vec_to_del, vec_to_add, &new_tree);
+
+ // only 50 rowsets left in old key range "1000"-"1900"
+ out.clear();
+ new_tree.FindRowsetsIntersectingInterval(Slice("1000"), Slice("1999"), &out);
+ ASSERT_EQ(50, out.size());
+ // should get 10 new added rowsets with key range "2000"-"2900"
+ out.clear();
+ new_tree.FindRowsetsIntersectingInterval(Slice("2000"), Slice("2999"), &out);
+ ASSERT_EQ(10, out.size());
+ out.clear();
+ new_tree.FindRowsetsIntersectingInterval(Slice("1000"), Slice("2999"), &out);
+ ASSERT_EQ(60, out.size());
+}
+
+class TestRowsetTreePerformance : public TestRowsetTree,
+ public testing::WithParamInterface<std::tuple<int, int>> {};
+INSTANTIATE_TEST_SUITE_P(Parameters, TestRowsetTreePerformance,
+ testing::Combine(
+ // Number of rowsets.
+ // Up to 500 rowsets (500*32MB = 16GB tablet)
+ testing::Values(10, 100, 250, 500),
+ // Number of query points in a batch.
+ testing::Values(10, 100, 500, 1000, 5000)));
+
+TEST_P(TestRowsetTreePerformance, TestPerformance) {
+ const int kNumRowsets = std::get<0>(GetParam());
+ const int kNumQueries = std::get<1>(GetParam());
+ const int kNumIterations = AllowSlowTests() ? 1000 : 10;
+
+ MonotonicStopWatch one_at_time_timer;
+ MonotonicStopWatch batch_timer;
+ for (int i = 0; i < kNumIterations; i++) {
+ // Create a bunch of rowsets, each of which spans about 10% of the "row space".
+ // The row space here is 4-digit 0-padded numbers.
+ RowsetVector vec = GenerateRandomRowsets(kNumRowsets);
+
+ RowsetTree tree;
+ ASSERT_TRUE(tree.Init(vec).ok());
+
+ vector<string> queries;
+ for (int j = 0; j < kNumQueries; j++) {
+ int query = rand_rng_int(0, 10000);
+ queries.emplace_back(StringPrintf("%04d", query));
+ }
+
+ int individual_matches = 0;
+ one_at_time_timer.start();
+ {
+ vector<std::pair<RowsetSharedPtr, int32_t>> out;
+ for (const auto& q : queries) {
+ out.clear();
+ tree.FindRowsetsWithKeyInRange(Slice(q), &out);
+ individual_matches += out.size();
+ }
+ }
+ one_at_time_timer.stop();
+
+ vector<Slice> query_slices;
+ for (const auto& q : queries) {
+ query_slices.emplace_back(q);
+ }
+
+ batch_timer.start();
+ std::sort(query_slices.begin(), query_slices.end(), Slice::Comparator());
+ int bulk_matches = 0;
+ {
+ tree.ForEachRowsetContainingKeys(
+ query_slices, [&](RowsetSharedPtr rs, int slice_idx) { bulk_matches++; });
+ }
+ batch_timer.stop();
+
+ ASSERT_EQ(bulk_matches, individual_matches);
+ }
+
+ double batch_total = batch_timer.elapsed_time();
+ double oat_total = one_at_time_timer.elapsed_time();
+ const string& case_desc = StringPrintf("Q=% 5d R=% 5d", kNumQueries, kNumRowsets);
+ LOG(INFO) << StringPrintf("%s %10s %d ms", case_desc.c_str(), "1-by-1",
+ static_cast<int>(oat_total / 1e6));
+ LOG(INFO) << StringPrintf("%s %10s %d ms (%.2fx)", case_desc.c_str(), "batched",
+ static_cast<int>(batch_total / 1e6),
+ batch_total ? (oat_total / batch_total) : 0);
+}
+
+TEST_F(TestRowsetTree, TestEndpointsConsistency) {
+ const int kNumRowsets = 1000;
+ RowsetVector vec = GenerateRandomRowsets(kNumRowsets);
+ // Add pathological one-key rows
+ for (int i = 0; i < 10; ++i) {
+ vec.push_back(create_rowset(StringPrintf("%04d", 11000), StringPrintf("%04d", 11000)));
+ }
+ vec.push_back(create_rowset(StringPrintf("%04d", 12000), StringPrintf("%04d", 12000)));
+ // Make tree
+ RowsetTree tree;
+ ASSERT_TRUE(tree.Init(vec).ok());
+ // Keep track of "currently open" intervals defined by the endpoints
+ unordered_set<RowsetSharedPtr> open;
+ // Keep track of all rowsets that have been visited
+ unordered_set<RowsetSharedPtr> visited;
+
+ Slice prev;
+ for (const RowsetTree::RSEndpoint& rse : tree.key_endpoints()) {
+ RowsetSharedPtr rs = rse.rowset_;
+ enum RowsetTree::EndpointType ept = rse.endpoint_;
+ const Slice& slice = rse.slice_;
+
+ ASSERT_TRUE(rs != nullptr) << "RowsetTree has an endpoint with no rowset";
+ ASSERT_TRUE(!slice.empty()) << "RowsetTree has an endpoint with no key";
+
+ if (!prev.empty()) {
+ ASSERT_LE(prev.compare(slice), 0);
+ }
+
+ std::vector<KeyBoundsPB> segments_key_bounds;
+ ASSERT_TRUE(rs->get_segments_key_bounds(&segments_key_bounds).ok());
+ ASSERT_EQ(1, segments_key_bounds.size());
+ string min = segments_key_bounds[0].min_key();
+ string max = segments_key_bounds[0].max_key();
+ if (ept == RowsetTree::START) {
+ ASSERT_EQ(min, slice.to_string());
+ ASSERT_TRUE(InsertIfNotPresent(&open, rs));
+ ASSERT_TRUE(InsertIfNotPresent(&visited, rs));
+ } else if (ept == RowsetTree::STOP) {
+ ASSERT_EQ(max, slice.to_string());
+ ASSERT_TRUE(open.erase(rs) == 1);
+ } else {
+ FAIL() << "No such endpoint type exists";
+ }
+ }
+}
+
+} // namespace doris
diff --git a/be/test/testutil/mock_rowset.h b/be/test/testutil/mock_rowset.h
new file mode 100644
index 0000000000..7a1bc4a746
--- /dev/null
+++ b/be/test/testutil/mock_rowset.h
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "olap/rowset/rowset.h"
+
+namespace doris {
+
+class MockRowset : public Rowset {
+ virtual Status create_reader(std::shared_ptr<RowsetReader>* result) override {
+ return Status::NotSupported("MockRowset not support this method.");
+ }
+
+ virtual Status split_range(const RowCursor& start_key, const RowCursor& end_key,
+ uint64_t request_block_row_count, size_t key_num,
+ std::vector<OlapTuple>* ranges) override {
+ return Status::NotSupported("MockRowset not support this method.");
+ }
+
+ virtual Status remove() override {
+ return Status::NotSupported("MockRowset not support this method.");
+ }
+
+ virtual Status link_files_to(const std::string& dir, RowsetId new_rowset_id) override {
+ return Status::NotSupported("MockRowset not support this method.");
+ }
+
+ virtual Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) override {
+ return Status::NotSupported("MockRowset not support this method.");
+ }
+
+ virtual Status remove_old_files(std::vector<std::string>* files_to_remove) override {
+ return Status::NotSupported("MockRowset not support this method.");
+ }
+
+ virtual bool check_path(const std::string& path) override {
+ return Status::NotSupported("MockRowset not support this method.");
+ }
+
+ virtual bool check_file_exist() override {
+ return Status::NotSupported("MockRowset not support this method.");
+ }
+
+ virtual Status get_segments_key_bounds(std::vector<KeyBoundsPB>* segments_key_bounds) override {
+ // TODO(zhangchen): remove this after we implemented memrowset.
+ if (is_mem_rowset_) {
+ return Status::NotSupported("Memtable not support key bounds");
+ }
+ return Rowset::get_segments_key_bounds(segments_key_bounds);
+ }
+
+ static Status create_rowset(const TabletSchema* schema, const std::string& rowset_path,
+ RowsetMetaSharedPtr rowset_meta, RowsetSharedPtr* rowset,
+ bool is_mem_rowset = false) {
+ rowset->reset(new MockRowset(schema, rowset_path, rowset_meta));
+ ((MockRowset*)rowset->get())->is_mem_rowset_ = is_mem_rowset;
+ return Status::OK();
+ }
+
+protected:
+ MockRowset(const TabletSchema* schema, const std::string& rowset_path,
+ RowsetMetaSharedPtr rowset_meta)
+ : Rowset(schema, rowset_path, rowset_meta) {}
+
+ virtual Status init() override {
+ return Status::NotSupported("MockRowset not support this method.");
+ }
+
+ virtual Status do_load(bool use_cache) override {
+ return Status::NotSupported("MockRowset not support this method.");
+ }
+
+ virtual void do_close() override {
+ // Do nothing.
+ }
+
+private:
+ bool is_mem_rowset_;
+};
+
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org