You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by hu...@apache.org on 2023/06/05 03:58:05 UTC

[incubator-kvrocks] branch unstable updated: Add the support of the ZRANGESTORE command (#1482)

This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 7554a0dc Add the support of the ZRANGESTORE command  (#1482)
7554a0dc is described below

commit 7554a0dc3e0735d7968a5126cc81e060ff1a441b
Author: 纪华裕 <80...@qq.com>
AuthorDate: Mon Jun 5 11:57:58 2023 +0800

    Add the support of the ZRANGESTORE command  (#1482)
    
    - Add the support of the ZRANGESTORE command
    - Change RangeByLex function signature, make it return MemberScores instead of Member
---
 src/commands/cmd_zset.cc                 | 141 +++++++++++++++++++++++++++----
 src/types/redis_zset.cc                  |   8 +-
 src/types/redis_zset.h                   |   3 +-
 tests/cppunit/types/zset_test.cc         |  49 ++++++-----
 tests/gocase/unit/type/zset/zset_test.go |  68 +++++++++++++++
 5 files changed, 225 insertions(+), 44 deletions(-)

diff --git a/src/commands/cmd_zset.cc b/src/commands/cmd_zset.cc
index 89ab826b..216288fd 100644
--- a/src/commands/cmd_zset.cc
+++ b/src/commands/cmd_zset.cc
@@ -337,6 +337,123 @@ class CommandZMPop : public Commander {
   int count_ = 1;
 };
 
+class CommandZRangeStore : public Commander {
+ public:
+  explicit CommandZRangeStore() : range_type_(kZRangeRank), direction_(kZRangeDirectionForward) {}
+
+  Status Parse(const std::vector<std::string> &args) override {
+    dst_ = args[1];
+    src_ = args[2];
+
+    int64_t offset = 0;
+    int64_t count = -1;
+    // skip the <CMD> <dst> <src> <min> <max> args and parse remaining optional arguments
+    CommandParser parser(args, 5);
+    while (parser.Good()) {
+      if (parser.EatEqICase("limit")) {
+        auto parse_offset = parser.TakeInt<int64_t>();
+        auto parse_count = parser.TakeInt<int64_t>();
+        if (!parse_offset || !parse_count) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+        offset = *parse_offset;
+        count = *parse_count;
+      } else if (parser.EatEqICase("bylex")) {
+        range_type_ = kZRangeLex;
+      } else if (parser.EatEqICase("byscore")) {
+        range_type_ = kZRangeScore;
+      } else if (parser.EatEqICase("rev")) {
+        direction_ = kZRangeDirectionReverse;
+      } else {
+        return parser.InvalidSyntax();
+      }
+    }
+
+    if (count != -1 && range_type_ == kZRangeRank) {
+      return {Status::RedisParseErr,
+              "syntax error, LIMIT is only supported in combination with either BYSCORE or BYLEX"};
+    }
+
+    // resolve index of <min> <max>
+    int min_idx = 3;
+    int max_idx = 4;
+    if (direction_ == kZRangeDirectionReverse && (range_type_ == kZRangeLex || range_type_ == kZRangeScore)) {
+      min_idx = 4;
+      max_idx = 3;
+    }
+
+    // parse range spec
+    switch (range_type_) {
+      case kZRangeAuto:
+      case kZRangeRank:
+        GET_OR_RET(ParseRangeRankSpec(args[min_idx], args[max_idx], &rank_spec_));
+        if (direction_ == kZRangeDirectionReverse) {
+          rank_spec_.reversed = true;
+        }
+        break;
+      case kZRangeLex:
+        GET_OR_RET(ParseRangeLexSpec(args[min_idx], args[max_idx], &lex_spec_));
+        lex_spec_.offset = offset;
+        lex_spec_.count = count;
+        if (direction_ == kZRangeDirectionReverse) {
+          lex_spec_.reversed = true;
+        }
+        break;
+      case kZRangeScore:
+        GET_OR_RET(ParseRangeScoreSpec(args[min_idx], args[max_idx], &score_spec_));
+        score_spec_.offset = offset;
+        score_spec_.count = count;
+        if (direction_ == kZRangeDirectionReverse) {
+          score_spec_.reversed = true;
+        }
+        break;
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
+
+    std::vector<MemberScore> member_scores;
+
+    rocksdb::Status s;
+    switch (range_type_) {
+      case kZRangeAuto:
+      case kZRangeRank:
+        s = zset_db.RangeByRank(src_, rank_spec_, &member_scores, nullptr);
+        break;
+      case kZRangeScore:
+        s = zset_db.RangeByScore(src_, score_spec_, &member_scores, nullptr);
+        break;
+      case kZRangeLex:
+        s = zset_db.RangeByLex(src_, lex_spec_, &member_scores, nullptr);
+        break;
+    }
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+
+    uint64_t ret = 0;
+    s = zset_db.Add(dst_, ZAddFlags(), &member_scores, &ret);
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+    *output = redis::Integer(ret);
+    return Status::OK();
+  }
+
+ private:
+  std::string src_;
+  std::string dst_;
+  ZRangeType range_type_;
+  ZRangeDirection direction_;
+
+  RangeRankSpec rank_spec_;
+  RangeLexSpec lex_spec_;
+  RangeScoreSpec score_spec_;
+};
+
 class CommandZRangeGeneric : public Commander {
  public:
   explicit CommandZRangeGeneric(ZRangeType range_type = kZRangeAuto, ZRangeDirection direction = kZRangeDirectionAuto)
@@ -430,7 +547,6 @@ class CommandZRangeGeneric : public Commander {
     redis::ZSet zset_db(svr->storage, conn->GetNamespace());
 
     std::vector<MemberScore> member_scores;
-    std::vector<std::string> members;
 
     rocksdb::Status s;
     switch (range_type_) {
@@ -442,29 +558,19 @@ class CommandZRangeGeneric : public Commander {
         s = zset_db.RangeByScore(key_, score_spec_, &member_scores, nullptr);
         break;
       case kZRangeLex:
-        s = zset_db.RangeByLex(key_, lex_spec_, &members, nullptr);
+        s = zset_db.RangeByLex(key_, lex_spec_, &member_scores, nullptr);
         break;
     }
     if (!s.ok()) {
       return {Status::RedisExecErr, s.ToString()};
     }
 
-    switch (range_type_) {
-      case kZRangeLex:
-        output->append(redis::MultiBulkString(members, false));
-        return Status::OK();
-      case kZRangeAuto:
-      case kZRangeRank:
-      case kZRangeScore:
-        output->append(redis::MultiLen(member_scores.size() * (with_scores_ ? 2 : 1)));
-        for (const auto &ms : member_scores) {
-          output->append(redis::BulkString(ms.member));
-          if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
-        }
-        return Status::OK();
+    output->append(redis::MultiLen(member_scores.size() * (with_scores_ ? 2 : 1)));
+    for (const auto &ms : member_scores) {
+      output->append(redis::BulkString(ms.member));
+      if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
     }
-
-    return {Status::RedisParseErr, "unexpected range type"};
+    return Status::OK();
   }
 
  private:
@@ -817,6 +923,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandZAdd>("zadd", -4, "write", 1, 1, 1),
                         MakeCmdAttr<CommandZPopMax>("zpopmax", -2, "write", 1, 1, 1),
                         MakeCmdAttr<CommandZPopMin>("zpopmin", -2, "write", 1, 1, 1),
                         MakeCmdAttr<CommandZMPop>("zmpop", -4, "write", CommandZMPop::Range),
+                        MakeCmdAttr<CommandZRangeStore>("zrangestore", -4, "write", 1, 1, 1),
                         MakeCmdAttr<CommandZRange>("zrange", -4, "read-only", 1, 1, 1),
                         MakeCmdAttr<CommandZRevRange>("zrevrange", -4, "read-only", 1, 1, 1),
                         MakeCmdAttr<CommandZRangeByLex>("zrangebylex", -4, "read-only", 1, 1, 1),
diff --git a/src/types/redis_zset.cc b/src/types/redis_zset.cc
index a9b1ba24..dbe65aa6 100644
--- a/src/types/redis_zset.cc
+++ b/src/types/redis_zset.cc
@@ -421,9 +421,9 @@ rocksdb::Status ZSet::RangeByScore(const Slice &user_key, const RangeScoreSpec &
   return rocksdb::Status::OK();
 }
 
-rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const RangeLexSpec &spec, Members *members,
+rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MemberScores *mscores,
                                  uint64_t *removed_cnt) {
-  if (members) members->clear();
+  if (mscores) mscores->clear();
 
   uint64_t cnt = 0;
   if (!removed_cnt) removed_cnt = &cnt;
@@ -498,10 +498,10 @@ rocksdb::Status ZSet::RangeByLex(const Slice &user_key, const RangeLexSpec &spec
       batch->Delete(score_cf_handle_, score_key);
       batch->Delete(iter->key());
     } else {
-      if (members) members->emplace_back(member.ToString());
+      if (mscores) mscores->emplace_back(MemberScore{member.ToString(), DecodeDouble(iter->value().data())});
     }
     *removed_cnt += 1;
-    if (spec.count > 0 && members && members->size() >= static_cast<unsigned>(spec.count)) break;
+    if (spec.count > 0 && mscores && mscores->size() >= static_cast<unsigned>(spec.count)) break;
   }
 
   if (spec.with_deletion && *removed_cnt > 0) {
diff --git a/src/types/redis_zset.h b/src/types/redis_zset.h
index 9282b608..5990580f 100644
--- a/src/types/redis_zset.h
+++ b/src/types/redis_zset.h
@@ -118,7 +118,8 @@ class ZSet : public SubKeyScanner {
                               uint64_t *removed_cnt);
   rocksdb::Status RangeByScore(const Slice &user_key, const RangeScoreSpec &spec, MemberScores *mscores,
                                uint64_t *removed_cnt);
-  rocksdb::Status RangeByLex(const Slice &user_key, const RangeLexSpec &spec, Members *members, uint64_t *removed_cnt);
+  rocksdb::Status RangeByLex(const Slice &user_key, const RangeLexSpec &spec, MemberScores *mscores,
+                             uint64_t *removed_cnt);
 
  private:
   rocksdb::ColumnFamilyHandle *score_cf_handle_;
diff --git a/tests/cppunit/types/zset_test.cc b/tests/cppunit/types/zset_test.cc
index ea0e7225..f7e6e27a 100644
--- a/tests/cppunit/types/zset_test.cc
+++ b/tests/cppunit/types/zset_test.cc
@@ -175,44 +175,48 @@ TEST_F(RedisZSetTest, PopMax) {
 
 TEST_F(RedisZSetTest, RangeByLex) {
   uint64_t ret = 0;
+  uint64_t count = fields_.size();
   std::vector<MemberScore> mscores;
   for (size_t i = 0; i < fields_.size(); i++) {
     mscores.emplace_back(MemberScore{fields_[i].ToString(), scores_[i]});
   }
   zset_->Add(key_, ZAddFlags::Default(), &mscores, &ret);
-  EXPECT_EQ(fields_.size(), ret);
+  EXPECT_EQ(count, ret);
 
   RangeLexSpec spec;
   spec.min = fields_[0].ToString();
   spec.max = fields_[fields_.size() - 1].ToString();
-  std::vector<std::string> members;
-  zset_->RangeByLex(key_, spec, &members, nullptr);
-  EXPECT_EQ(members.size(), fields_.size());
-  for (size_t i = 0; i < members.size(); i++) {
-    EXPECT_EQ(members[i], fields_[i].ToString());
+  zset_->RangeByLex(key_, spec, &mscores, nullptr);
+  EXPECT_EQ(mscores.size(), fields_.size());
+  for (size_t i = 0; i < mscores.size(); i++) {
+    EXPECT_EQ(mscores[i].member, fields_[i].ToString());
+    EXPECT_EQ(mscores[i].score, scores_[i]);
   }
 
   spec.minex = true;
-  zset_->RangeByLex(key_, spec, &members, nullptr);
-  EXPECT_EQ(members.size(), fields_.size() - 1);
-  for (size_t i = 0; i < members.size(); i++) {
-    EXPECT_EQ(members[i], fields_[i + 1].ToString());
+  zset_->RangeByLex(key_, spec, &mscores, nullptr);
+  EXPECT_EQ(mscores.size(), fields_.size() - 1);
+  for (size_t i = 0; i < mscores.size(); i++) {
+    EXPECT_EQ(mscores[i].member, fields_[i + 1].ToString());
+    EXPECT_EQ(mscores[i].score, scores_[i + 1]);
   }
 
   spec.minex = false;
   spec.maxex = true;
-  zset_->RangeByLex(key_, spec, &members, nullptr);
-  EXPECT_EQ(members.size(), fields_.size() - 1);
-  for (size_t i = 0; i < members.size(); i++) {
-    EXPECT_EQ(members[i], fields_[i].ToString());
+  zset_->RangeByLex(key_, spec, &mscores, nullptr);
+  EXPECT_EQ(mscores.size(), fields_.size() - 1);
+  for (size_t i = 0; i < mscores.size(); i++) {
+    EXPECT_EQ(mscores[i].member, fields_[i].ToString());
+    EXPECT_EQ(mscores[i].score, scores_[i]);
   }
 
   spec.minex = true;
   spec.maxex = true;
-  zset_->RangeByLex(key_, spec, &members, nullptr);
-  EXPECT_EQ(members.size(), fields_.size() - 2);
-  for (size_t i = 0; i < members.size(); i++) {
-    EXPECT_EQ(members[i], fields_[i + 1].ToString());
+  zset_->RangeByLex(key_, spec, &mscores, nullptr);
+  EXPECT_EQ(mscores.size(), fields_.size() - 2);
+  for (size_t i = 0; i < mscores.size(); i++) {
+    EXPECT_EQ(mscores[i].member, fields_[i + 1].ToString());
+    EXPECT_EQ(mscores[i].score, scores_[i + 1]);
   }
   spec.minex = false;
   spec.maxex = false;
@@ -220,10 +224,11 @@ TEST_F(RedisZSetTest, RangeByLex) {
   spec.max = "+";
   spec.max_infinite = true;
   spec.reversed = true;
-  zset_->RangeByLex(key_, spec, &members, nullptr);
-  EXPECT_EQ(members.size(), fields_.size());
-  for (size_t i = 0; i < members.size(); i++) {
-    EXPECT_EQ(members[i], fields_[6 - i].ToString());
+  zset_->RangeByLex(key_, spec, &mscores, nullptr);
+  EXPECT_EQ(mscores.size(), fields_.size());
+  for (size_t i = 0; i < mscores.size(); i++) {
+    EXPECT_EQ(mscores[i].member, fields_[count - i - 1].ToString());
+    EXPECT_EQ(mscores[i].score, scores_[count - i - 1]);
   }
 
   zset_->Del(key_);
diff --git a/tests/gocase/unit/type/zset/zset_test.go b/tests/gocase/unit/type/zset/zset_test.go
index cd98d3cd..872e075a 100644
--- a/tests/gocase/unit/type/zset/zset_test.go
+++ b/tests/gocase/unit/type/zset/zset_test.go
@@ -337,6 +337,74 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, encoding s
 		require.EqualValues(t, 0, rdb.Exists(ctx, "zseta", "zsetb").Val())
 	})
 
+	t.Run(fmt.Sprintf("ZRANGESTORE basics - %s", encoding), func(t *testing.T) {
+		rdb.Del(ctx, "zsrc")
+		rdb.Del(ctx, "zdst")
+
+		rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 1, Member: "a"})
+		rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 3, Member: "b"})
+		rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 4, Member: "c"})
+		rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 6, Member: "d"})
+		rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 9, Member: "g"})
+		rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 7, Member: "f"})
+
+		rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 1, Stop: 3})
+		require.Equal(t, []string{"b", "c", "d"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+		rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 0, Stop: 2})
+		require.Equal(t, []string{"a", "b", "c", "d"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+		rdb.Del(ctx, "zdst")
+		rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 0, Stop: 0})
+		require.Equal(t, []string{"a"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+		//add none
+		rdb.Del(ctx, "zdst")
+		rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 99, Stop: 99})
+		require.Equal(t, []string{}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+		// rev
+		rdb.Del(ctx, "zdst")
+		rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 1, Stop: 3, Rev: true})
+		require.Equal(t, []string{"c", "d", "f"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+		// byScore
+		rdb.Del(ctx, "zdst")
+		rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 2, Stop: 5, ByScore: true})
+		require.Equal(t, []string{"b", "c"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+		// byScore limit offset count
+		rdb.Del(ctx, "zdst")
+		rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: "1", Stop: "7", ByScore: true, Offset: 2, Count: 3})
+		require.Equal(t, []string{"c", "d", "f"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+		// byLex
+		rdb.Del(ctx, "zdst")
+		rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: "[c", Stop: "[f", ByLex: true})
+		require.Equal(t, []string{"c", "d", "f"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+
+		// byLex limit offset count
+		rdb.Del(ctx, "zdst")
+		rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: "[a", Stop: "[g", ByLex: true, Offset: 2, Count: 3})
+		require.Equal(t, []string{"c", "d", "f"}, rdb.ZRange(ctx, "zdst", 0, -1).Val())
+	})
+
+	t.Run(fmt.Sprintf("ZRANGESTORE error - %s", encoding), func(t *testing.T) {
+		rdb.Del(ctx, "zsrc")
+		rdb.Del(ctx, "zdst")
+
+		rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 1, Member: "a"})
+		rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 3, Member: "b"})
+		rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 4, Member: "c"})
+		rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 6, Member: "d"})
+		rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 9, Member: "g"})
+		rdb.ZAdd(ctx, "zsrc", redis.Z{Score: 7, Member: "f"})
+
+		util.ErrorRegexp(t, rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: "xx", Stop: "ww"}).Err(), ".*not an integer.*")
+		util.ErrorRegexp(t, rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 1}).Err(), ".*not an integer.*")
+		util.ErrorRegexp(t, rdb.ZRangeStore(ctx, "zdst", redis.ZRangeArgs{Key: "zsrc", Start: 1, Stop: 3, Count: 1, Offset: 1}).Err(), ".*error.*")
+	})
+
 	t.Run(fmt.Sprintf("ZRANGE basics - %s", encoding), func(t *testing.T) {
 		rdb.Del(ctx, "ztmp")
 		rdb.ZAdd(ctx, "ztmp", redis.Z{Score: 1, Member: "a"})