You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by hu...@apache.org on 2023/06/05 03:58:05 UTC
[incubator-kvrocks] branch unstable updated: Add the support of the ZRANGESTORE command (#1482)
This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 7554a0dc Add the support of the ZRANGESTORE command (#1482)
7554a0dc is described below
commit 7554a0dc3e0735d7968a5126cc81e060ff1a441b
Author: 纪华裕 <80...@qq.com>
AuthorDate: Mon Jun 5 11:57:58 2023 +0800
Add the support of the ZRANGESTORE command (#1482)
- Add the support of the ZRANGESTORE command
- Change RangeByLex function signature, make it return MemberScores instead of Member
---
src/commands/cmd_zset.cc | 141 +++++++++++++++++++++++++++----
src/types/redis_zset.cc | 8 +-
src/types/redis_zset.h | 3 +-
tests/cppunit/types/zset_test.cc | 49 ++++++-----
tests/gocase/unit/type/zset/zset_test.go | 68 +++++++++++++++
5 files changed, 225 insertions(+), 44 deletions(-)
diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc
index 89ab826b..216288fd 100644
--- a/src/commands/cmd_zset.cc
+++ b/src/commands/cmd_zset.cc
@@ -337,6 +337,123 @@ class CommandZMPop : public Commander {
int count_ = 1;
};
+class CommandZRangeStore : public Commander {
+ public:
+ explicit CommandZRangeStore() : range_type_(kZRangeRank), direction_(kZRangeDirectionForward) {}
+
+ Status Parse(const std::vector<std::string> &args) override {
+ dst_ = args[1];
+ src_ = args[2];
+
+ int64_t offset = 0;
+ int64_t count = -1;
+ // skip the <CMD> <dst> <src> <min> <max> args and parse remaining optional arguments
+ CommandParser parser(args, 5);
+ while (parser.Good()) {
+ if (parser.EatEqICase("limit")) {
+ auto parse_offset = parser.TakeInt<int64_t>();
+ auto parse_count = parser.TakeInt<int64_t>();
+ if (!parse_offset || !parse_count) {
+ return {Status::RedisParseErr, errValueNotInteger};
+ }
+ offset = *parse_offset;
+ count = *parse_count;
+ } else if (parser.EatEqICase("bylex")) {
+ range_type_ = kZRangeLex;
+ } else if (parser.EatEqICase("byscore")) {
+ range_type_ = kZRangeScore;
+ } else if (parser.EatEqICase("rev")) {
+ direction_ = kZRangeDirectionReverse;
+ } else {
+ return parser.InvalidSyntax();
+ }
+ }
+
+ if (count != -1 && range_type_ == kZRangeRank) {
+ return {Status::RedisParseErr,
+ "syntax error, LIMIT is only supported in combination with either BYSCORE or BYLEX"};
+ }
+
+ // resolve index of <min> <max>
+ int min_idx = 3;
+ int max_idx = 4;
+ if (direction_ == kZRangeDirectionReverse && (range_type_ == kZRangeLex || range_type_ == kZRangeScore)) {
+ min_idx = 4;
+ max_idx = 3;
+ }
+
+ // parse range spec
+ switch (range_type_) {
+ case kZRangeAuto:
+ case kZRangeRank:
+ GET_OR_RET(ParseRangeRankSpec(args[min_idx], args[max_idx], &rank_spec_));
+ if (direction_ == kZRangeDirectionReverse) {
+ rank_spec_.reversed = true;
+ }
+ break;
+ case kZRangeLex:
+ GET_OR_RET(ParseRangeLexSpec(args[min_idx], args[max_idx], &lex_spec_));
+ lex_spec_.offset = offset;
+ lex_spec_.count = count;
+ if (direction_ == kZRangeDirectionReverse) {
+ lex_spec_.reversed = true;
+ }
+ break;
+ case kZRangeScore:
+ GET_OR_RET(ParseRangeScoreSpec(args[min_idx], args[max_idx], &score_spec_));
+ score_spec_.offset = offset;
+ score_spec_.count = count;
+ if (direction_ == kZRangeDirectionReverse) {
+ score_spec_.reversed = true;
+ }
+ break;
+ }
+
+ return Status::OK();
+ }
+
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ redis::ZSet zset_db(svr->storage, conn->GetNamespace());
+
+ std::vector<MemberScore> member_scores;
+
+ rocksdb::Status s;
+ switch (range_type_) {
+ case kZRangeAuto:
+ case kZRangeRank:
+ s = zset_db.RangeByRank(src_, rank_spec_, &member_scores, nullptr);
+ break;
+ case kZRangeScore:
+ s = zset_db.RangeByScore(src_, score_spec_, &member_scores, nullptr);
+ break;
+ case kZRangeLex:
+ s = zset_db.RangeByLex(src_, lex_spec_, &member_scores, nullptr);
+ break;
+ }
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+
+ uint64_t ret = 0;
+ s = zset_db.Add(dst_, ZAddFlags(), &member_scores, &ret);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+ *output = redis::Integer(ret);
+ return Status::OK();
+ }
+
+ private:
+ std::string src_;
+ std::string dst_;
+ ZRangeType range_type_;
+ ZRangeDirection direction_;
+
+ RangeRankSpec rank_spec_;
+ RangeLexSpec lex_spec_;
+ RangeScoreSpec score_spec_;
+};
+
class CommandZRangeGeneric : public Commander {
public:
explicit CommandZRangeGeneric(ZRangeType range_type = kZRangeAuto, ZRangeDirection direction = kZRangeDirectionAuto)
@@ -430,7 +547,6 @@ class CommandZRangeGeneric : public Commander {
redis::ZSet zset_db(svr->storage, conn->GetNamespace());
std::vector<MemberScore> member_scores;
- std::vector<std::string> members;
rocksdb::Status s;
switch (range_type_) {
@@ -442,29 +558,19 @@ class CommandZRangeGeneric : public Commander {
s = zset_db.RangeByScore(key_, score_spec_, &member_scores, nullptr);
break;
case kZRangeLex:
- s = zset_db.RangeByLex(key_, lex_spec_, &members, nullptr);
+ s = zset_db.RangeByLex(key_, lex_spec_, &member_scores, nullptr);
break;
}
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
- switch (range_type_) {
- case kZRangeLex:
- output->append(redis::MultiBulkString(members, false));
- return Status::OK();
- case kZRangeAuto:
- case kZRangeRank:
- case kZRangeScore:
- output->append(redis::MultiLen(member_scores.size() * (with_scores_ ? 2 : 1)));
- for (const auto &ms : member_scores) {
- output->append(redis::BulkString(ms.member));
- if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
- }
- return Status::OK();
+ output->append(redis::MultiLen(member_scores.size() * (with_scores_ ? 2 : 1)));
+ for (const auto &ms : member_scores) {
+ output->append(redis::BulkString(ms.member));
+ if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
}
-
- return {Status::RedisParseErr, "unexpected range type"};
+ return Status::OK();
}
private:
@@ -817,6 +923,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandZAdd>("zadd", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandZPopMax>("zpopmax", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandZPopMin>("zpopmin", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandZMPop>("zmpop", -4, "write", CommandZMPop::Range),
+ MakeCmdAttr<CommandZRangeStore>("zrangestore", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandZRange>("zrange", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandZRevRange>("zrevrange", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandZRangeByLex>("zrangebylex", -4, "read-only", 1, 1, 1),
diff --git a/src/types/redis_zset.cc b/src/types/redis_zset.cc
index a9b1ba24..dbe65aa6 100644
--- a/src/types/redis_zset.cc
+++ b/src/types/redis_zset.cc
@@ -421,9 +421,9 @@ rocksdb::Status ZSet::RangeByScore(const Slice &user_key, const RangeScoreSpec &
return rocksdb::Status::OK();
}
-rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const RangeLexSpec &spec, Members *members,
+rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MemberScores *mscores,
uint64_t *removed_cnt) {
- if (members) members->clear();
+ if (mscores) mscores->clear();
uint64_t cnt = 0;
if (!removed_cnt) removed_cnt = &cnt;
@@ -498,10 +498,10 @@ rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const RangeLexSpec &spec
batch->Delete(score_cf_handle_, score_key);
batch->Delete(iter->key());
} else {
- if (members) members->emplace_back(member.ToString());
+ if (mscores) mscores->emplace_back(MemberScore{member.ToString(), DecodeDouble(iter->value().data())});
}
*removed_cnt += 1;
- if (spec.count > 0 && members && members->size() >= static_cast<unsigned>(spec.count)) break;
+ if (spec.count > 0 && mscores && mscores->size() >= static_cast<unsigned>(spec.count)) break;
}
if (spec.with_deletion && *removed_cnt > 0) {
diff --git a/src/types/redis_zset.h b/src/types/redis_zset.h
index 9282b608..5990580f 100644
--- a/src/types/redis_zset.h
+++ b/src/types/redis_zset.h
@@ -118,7 +118,8 @@ class ZSet : public SubKeyScanner {
uint64_t *removed_cnt);
rocksdb::Status RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScores *mscores,
uint64_t *removed_cnt);
- rocksdb::Status RangeByLex(const Slice &user_key, const RangeLexSpec &spec, Members *members, uint64_t *removed_cnt);
+ rocksdb::Status RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MemberScores *mscores,
+ uint64_t *removed_cnt);
private:
rocksdb::ColumnFamilyHandle *score_cf_handle_;
diff --git a/tests/cppunit/types/zset_test.cc b/tests/cppunit/types/zset_test.cc
index ea0e7225..f7e6e27a 100644
--- a/tests/cppunit/types/zset_test.cc
+++ b/tests/cppunit/types/zset_test.cc
@@ -175,44 +175,48 @@ TEST_F(RedisZSetTest, PopMax) {
TEST_F(RedisZSetTest, RangeByLex) {
uint64_t ret = 0;
+ uint64_t count = fields_.size();
std::vector<MemberScore> mscores;
for (size_t i = 0; i < fields_.size(); i++) {
mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
}
zset_->Add(key_, ZAddFlags::Default(), &mscores, &ret);
- EXPECT_EQ(fields_.size(), ret);
+ EXPECT_EQ(count, ret);
RangeLexSpec spec;
spec.min = fields_[0].ToString();
spec.max = fields_[fields_.size() - 1].ToString();
- std::vector<std::string> members;
- zset_->RangeByLex(key_, spec, &members, nullptr);
- EXPECT_EQ(members.size(), fields_.size());
- for (size_t i = 0; i < members.size(); i++) {
- EXPECT_EQ(members[i], fields_[i].ToString());
+ zset_->RangeByLex(key_, spec, &mscores, nullptr);
+ EXPECT_EQ(mscores.size(), fields_.size());
+ for (size_t i = 0; i < mscores.size(); i++) {
+ EXPECT_EQ(mscores[i].member, fields_[i].ToString());
+ EXPECT_EQ(mscores[i].score, scores_[i]);
}
spec.minex = true;
- zset_->RangeByLex(key_, spec, &members, nullptr);
- EXPECT_EQ(members.size(), fields_.size() - 1);
- for (size_t i = 0; i < members.size(); i++) {
- EXPECT_EQ(members[i], fields_[i + 1].ToString());
+ zset_->RangeByLex(key_, spec, &mscores, nullptr);
+ EXPECT_EQ(mscores.size(), fields_.size() - 1);
+ for (size_t i = 0; i < mscores.size(); i++) {
+ EXPECT_EQ(mscores[i].member, fields_[i + 1].ToString());
+ EXPECT_EQ(mscores[i].score, scores_[i + 1]);
}
spec.minex = false;
spec.maxex = true;
- zset_->RangeByLex(key_, spec, &members, nullptr);
- EXPECT_EQ(members.size(), fields_.size() - 1);
- for (size_t i = 0; i < members.size(); i++) {
- EXPECT_EQ(members[i], fields_[i].ToString());
+ zset_->RangeByLex(key_, spec, &mscores, nullptr);
+ EXPECT_EQ(mscores.size(), fields_.size() - 1);
+ for (size_t i = 0; i < mscores.size(); i++) {
+ EXPECT_EQ(mscores[i].member, fields_[i].ToString());
+ EXPECT_EQ(mscores[i].score, scores_[i]);
}
spec.minex = true;
spec.maxex = true;
- zset_->RangeByLex(key_, spec, &members, nullptr);
- EXPECT_EQ(members.size(), fields_.size() - 2);
- for (size_t i = 0; i < members.size(); i++) {
- EXPECT_EQ(members[i], fields_[i + 1].ToString());
+ zset_->RangeByLex(key_, spec, &mscores, nullptr);
+ EXPECT_EQ(mscores.size(), fields_.size() - 2);
+ for (size_t i = 0; i < mscores.size(); i++) {
+ EXPECT_EQ(mscores[i].member, fields_[i + 1].ToString());
+ EXPECT_EQ(mscores[i].score, scores_[i + 1]);
}
spec.minex = false;
spec.maxex = false;
@@ -220,10 +224,11 @@ TEST_F(RedisZSetTest, RangeByLex) {
spec.max = "+";
spec.max_infinite = true;
spec.reversed = true;
- zset_->RangeByLex(key_, spec, &members, nullptr);
- EXPECT_EQ(members.size(), fields_.size());
- for (size_t i = 0; i < members.size(); i++) {
- EXPECT_EQ(members[i], fields_[6 - i].ToString());
+ zset_->RangeByLex(key_, spec, &mscores, nullptr);
+ EXPECT_EQ(mscores.size(), fields_.size());
+ for (size_t i = 0; i < mscores.size(); i++) {
+ EXPECT_EQ(mscores[i].member, fields_[count - i - 1].ToString());
+ EXPECT_EQ(mscores[i].score, scores_[count - i - 1]);
}
zset_->Del(key_);
diff --git a/tests/gocase/unit/type/zset/zset_test.go b/tests/gocase/unit/type/zset/zset_test.go
index cd98d3cd..872e075a 100644
--- a/tests/gocase/unit/type/zset/zset_test.go
+++ b/tests/gocase/unit/type/zset/zset_test.go
@@ -337,6 +337,74 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, encoding s
require.EqualValues(t, 0, rdb.Exists(ctx, "zseta", "zsetb").Val())
})
+ t.Run(fmt.Sprintf("ZRANGESTORE basics - %s", encoding), func(t *testing.T) {
+ rdb.Del(ctx, "zsrc")
+ rdb.Del(ctx, "zdst")
+
+ rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 1, Member: "a"})
+ rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 3, Member: "b"})
+ rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 4, Member: "c"})
+ rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 6, Member: "d"})
+ rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 9, Member: "g"})
+ rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 7, Member: "f"})
+
+ rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 1, Stop: 3})
+ require.Equal(t, []string{"b", "c", "d"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+ rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 0, Stop: 2})
+ require.Equal(t, []string{"a", "b", "c", "d"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+ rdb.Del(ctx, "zdst")
+ rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 0, Stop: 0})
+ require.Equal(t, []string{"a"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+ //add none
+ rdb.Del(ctx, "zdst")
+ rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 99, Stop: 99})
+ require.Equal(t, []string{}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+ // rev
+ rdb.Del(ctx, "zdst")
+ rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 1, Stop: 3, Rev: true})
+ require.Equal(t, []string{"c", "d", "f"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+ // byScore
+ rdb.Del(ctx, "zdst")
+ rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 2, Stop: 5, ByScore: true})
+ require.Equal(t, []string{"b", "c"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+ // byScore limit offset count
+ rdb.Del(ctx, "zdst")
+ rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: "1", Stop: "7", ByScore: true, Offset: 2, Count: 3})
+ require.Equal(t, []string{"c", "d", "f"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+ // byLex
+ rdb.Del(ctx, "zdst")
+ rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: "[c", Stop: "[f", ByLex: true})
+ require.Equal(t, []string{"c", "d", "f"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+ // byLex limit offset count
+ rdb.Del(ctx, "zdst")
+ rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: "[a", Stop: "[g", ByLex: true, Offset: 2, Count: 3})
+ require.Equal(t, []string{"c", "d", "f"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+ })
+
+ t.Run(fmt.Sprintf("ZRANGESTORE error - %s", encoding), func(t *testing.T) {
+ rdb.Del(ctx, "zsrc")
+ rdb.Del(ctx, "zdst")
+
+ rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 1, Member: "a"})
+ rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 3, Member: "b"})
+ rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 4, Member: "c"})
+ rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 6, Member: "d"})
+ rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 9, Member: "g"})
+ rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 7, Member: "f"})
+
+ util.ErrorRegexp(t, rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: "xx", Stop: "ww"}).Err(), ".*not an integer.*")
+ util.ErrorRegexp(t, rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 1}).Err(), ".*not an integer.*")
+ util.ErrorRegexp(t, rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 1, Stop: 3, Count: 1, Offset: 1}).Err(), ".*error.*")
+ })
+
t.Run(fmt.Sprintf("ZRANGE basics - %s", encoding), func(t *testing.T) {
rdb.Del(ctx, "ztmp")
rdb.ZAdd(ctx, "ztmp", redis.Z{Score: 1, Member: "a"})