You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kvrocks.apache.org by "Yangsx-1 (via GitHub)" <gi...@apache.org> on 2023/06/10 15:18:48 UTC

[GitHub] [incubator-kvrocks] Yangsx-1 opened a new pull request, #1490: Add the support of the BZMPOP command

Yangsx-1 opened a new pull request, #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490

   Close #1445 
   This PR add supports of [bzmpop](https://redis.io/commands/bzmpop/), [bzpopmax](https://redis.io/commands/bzpopmax/), [bzpopmin](https://redis.io/commands/bzpopmin/) command like Redis.
   And I also add some tests for these three commands.
   (It's my first PR for kvrocks, there maybe something i didn't recognize. I'm very glad to revise it if there is something wrong.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1231902054


##########
src/commands/cmd_zset.cc:
##########
@@ -221,64 +223,217 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}

Review Comment:
   The differences between zpopmin/max and bzpopmin/max is that: 1.bzpopmin/max have an additional 'timeout' input parameter 2.bzpopmin/max have an additional 'key' output. The inner processing logic is the same.
   In my first edition of the implementation, i implement the bzpopmin/max independent of zpopmin/max. But i received a comment that it has something duplicate. Maybe i misunderstood it. 
   But i think it's Ok? There are also some 'block' in the implementation of bzmpop and zmpop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#issuecomment-1595781461

   @Yangsx-1 Right now I don't know :) I'll have a look at the code later.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#issuecomment-1586479946

   > Thanks for your contribution!
   > 
   > I notice there are some duplicated code segments, could you try to refactor to avoid these trivially repeated code?
   
   Refactored.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] PragmaTwice commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1231886579


##########
src/commands/cmd_zset.cc:
##########
@@ -221,64 +223,217 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}

Review Comment:
   Sorry but, I actually can hardly find the benefit of merging the code for `BZPOP` and `ZPOP`. Most of I can see are lots of `if (block_)` checks to implement different logic separately, which seems just make the code more unreadable.
   
   If you want to merge some thing to prevent duplication, I think you need to find some **common logic** so that these code can be treated uniformly rather than just put more `if`s, e.g.
   
   ```
   f() { block a ... }
   g() { block b ... }
   
   // that is NOT a deduplication
   h() { if(...) { block a ...} else { block b ... } }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1231250120


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}

Review Comment:
   The task i do is to implement bzpopmax/bzpopmin, in order to deduplicate, i merge the code of zpopmax/zpopmin together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#issuecomment-1595821239

   > @Yangsx-1 I made a PR with my changes to your branch. You can approve it and merge so they should be visible here.
   
   Thanks for your patience and modification!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] git-hulk commented on pull request #1490: Add the support of the BZMPOP command

Posted by "git-hulk (via GitHub)" <gi...@apache.org>.
git-hulk commented on PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#issuecomment-1596198258

   Thanks for @Yangsx-1 contribution, you can add the command in https://github.com/apache/incubator-kvrocks-website/blob/main/docs/supported-commands.md if you like. Also thanks for the great review comments from @torwig and @PragmaTwice.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] PragmaTwice commented on pull request #1490: Add the support of the BZMPOP command

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#issuecomment-1586078350

   > ![1686471961296](https://user-images.githubusercontent.com/85615957/244924252-e64d70b4-d316-40a0-97e3-3199bed7f139.png) It seems like other things wrong? And it didn't happen in my forked repo's [workflow](https://github.com/Yangsx-1/incubator-kvrocks/actions/runs/5234167251).
   
   It may be a false positive of ThreadSanitizer about TBB. Dont worry.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1231285295


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
+    svr_ = svr;
+    conn_ = conn;
+
+    auto s = TryPopFromMultiZset();

Review Comment:
   @Yangsx-1 Personally, I don't like functions with the `Try` prefix. 
   Does the function `ReplyForBZPop` have something specific for blocking pop from a sorted set that you named it in this way?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1231271226


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
+    svr_ = svr;
+    conn_ = conn;
+
+    auto s = TryPopFromMultiZset();

Review Comment:
   @Yangsx-1 I appreciate your effort to remove duplication - it's the right way to write clean code. However, right now, as I mentioned before, the `TryPopFromMultiZset` pops and sends only an error reply and stores popped elements in a class member variable - a lot of actions just for a single function. Maybe, at least, it can return status and then you will check it and take appropriate actions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1226502426


##########
src/commands/cmd_zset.cc:
##########
@@ -301,40 +450,149 @@ class CommandZMPop : public Commander {
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
+    }
+
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already output
+    }
+
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {
+    redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
+    rocksdb::Status s;
+    std::string output;
     for (auto &user_key : keys_) {
       std::vector<MemberScore> member_scores;
-      auto s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores);
+      s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores);
       if (!s.ok()) {
-        return {Status::RedisExecErr, s.ToString()};
+        conn_->Reply(redis::Error("ERR " + s.ToString()));
+        break;
       }
       if (member_scores.empty()) {
         continue;
       }
-
-      output->append(redis::MultiLen(2));
-      output->append(redis::BulkString(user_key));
-      output->append(redis::MultiLen(member_scores.size() * 2));
+      output.append(redis::MultiLen(2));
+      output.append(redis::BulkString(user_key));
+      output.append(redis::MultiLen(member_scores.size() * 2));
       for (const auto &ms : member_scores) {
-        output->append(redis::BulkString(ms.member));
-        output->append(redis::BulkString(util::Float2String(ms.score)));
+        output.append(redis::BulkString(ms.member));
+        output.append(redis::BulkString(util::Float2String(ms.score)));
       }
-      return Status::OK();
+      reply_flag_ = true;
+      break;
     }
-    *output = redis::NilString();
-    return Status::OK();
+    if (output.empty() && !block_) {
+      output = redis::NilString();
+    }
+    conn_->Reply(output);
+    return s;
+  }
+
+  void OnWrite(bufferevent *bev) {
+    auto s = TryPopFromZset();
+    if (!s.ok() || !reply_flag_) {

Review Comment:
   In the case of an error inside `TryPopFromZset`, the function by itself will send an error and then the connection allegedly will stay blocked. Am I right?



##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        count_ = *parse_result;
+      }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    } else {
+      auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
       if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        return {Status::RedisParseErr, "timeout is not an integer or out of range"};
+      }
+
+      if (*parse_result < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};
       }
 
-      count_ = *parse_result;
+      timeout_ = *parse_result;
+
+      keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
+      return Commander::Parse(args);
     }
-    return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
-    if (!s.ok()) {
-      return {Status::RedisExecErr, s.ToString()};
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
     }
 
-    output->append(redis::MultiLen(member_scores.size() * 2));
-    for (const auto &ms : member_scores) {
-      output->append(redis::BulkString(ms.member));
-      output->append(redis::BulkString(util::Float2String(ms.score)));
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already output
     }
 
-    return Status::OK();
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {

Review Comment:
   I saw this function twice in this file. Perhaps, something could be done to remove/minimize duplication.



##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        count_ = *parse_result;
+      }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    } else {

Review Comment:
   Here you can write without `else` since you have `return` in the `if` part - it saves some indentation.



##########
src/commands/cmd_zset.cc:
##########
@@ -301,40 +450,149 @@ class CommandZMPop : public Commander {
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
+    }
+
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already output
+    }
+
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {

Review Comment:
   Actually, the function pops from multiple sorted sets which is not expressed by its name. And, except for popping elements, it sends a reply. It's better to separate concerns: pop and return elements in one function and then analyze (if something was popped/if the mode is blocking/etc) and send the response in another function. Let the function do just one thing.



##########
src/commands/cmd_zset.cc:
##########
@@ -301,40 +450,149 @@ class CommandZMPop : public Commander {
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
+    }
+
+    auto bev = conn->GetBufferEvent();

Review Comment:
   You can move this line closer to the place where `bev` is used, e.g. before the `SetCB(bev);` line.



##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        count_ = *parse_result;
+      }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    } else {
+      auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
       if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        return {Status::RedisParseErr, "timeout is not an integer or out of range"};
+      }
+
+      if (*parse_result < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};
       }
 
-      count_ = *parse_result;
+      timeout_ = *parse_result;
+
+      keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
+      return Commander::Parse(args);
     }
-    return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
-    if (!s.ok()) {
-      return {Status::RedisExecErr, s.ToString()};
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();

Review Comment:
   If `!block_` you do `TryPopFromZset` and if `block_`  actually you do `TryPopFromZset` as well (identical actions inside the `if` and the `else` branches). 
   I'm sure you will refactor this after you refactor/split the `TryPopFromZset` function.



##########
src/commands/cmd_zset.cc:
##########
@@ -301,40 +450,149 @@ class CommandZMPop : public Commander {
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
+    }
+
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already output
+    }
+
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {
+    redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
+    rocksdb::Status s;
+    std::string output;
     for (auto &user_key : keys_) {
       std::vector<MemberScore> member_scores;
-      auto s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores);
+      s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores);
       if (!s.ok()) {
-        return {Status::RedisExecErr, s.ToString()};
+        conn_->Reply(redis::Error("ERR " + s.ToString()));

Review Comment:
   In case of an error inside `Pop` you will reply with the error message and then just `break` which only interrupts the `for` loop. And later there is a chance that you will reply with a nil-string or some data that will be in the `output`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] PragmaTwice commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1232074452


##########
src/commands/cmd_zset.cc:
##########
@@ -221,64 +223,217 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);

Review Comment:
   For example, in the `Parse` function you write, it is just two code block separated by `if (block_)`, i.e.
   ```
   // the original code
   ZPOP::Parse() {
     parse ZPOP...
   }
   
   BZPOP::Parse() {
     parse BZPOP...
   }
   
   // your improved code
   Parse() {
     if(!block_) { 
       parse ZPOP ... 
     } 
     parse BZPOP ... 
   }
   ``` 
   
   IMHO it is not related to deduplication, i.e. the duplicated code seems not eliminated at all. 
   And the readabilitiy of the code get even worse in my view.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1233089993


##########
tests/gocase/unit/type/zset/zset_test.go:
##########
@@ -318,6 +318,62 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, encoding s
 		require.Equal(t, []redis.Z{{Score: 10, Member: "a"}}, rdb.ZPopMax(ctx, "ztmp", 3).Val())
 	})
 
+	t.Run(fmt.Sprintf("BZPOPMIN basics - %s", encoding), func(t *testing.T) {
+		rdb.Del(ctx, "zseta")
+		rdb.Del(ctx, "zsetb")
+		rdb.ZAdd(ctx, "zseta", redis.Z{Score: 1, Member: "a"}, redis.Z{Score: 2, Member: "b"}, redis.Z{Score: 3, Member: "c"})
+		rdb.ZAdd(ctx, "zsetb", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"})
+		require.EqualValues(t, 3, rdb.ZCard(ctx, "zseta").Val())
+		require.EqualValues(t, 2, rdb.ZCard(ctx, "zsetb").Val())
+		resultz := rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
+		require.Equal(t, redis.Z{Score: 1, Member: "a"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
+		require.Equal(t, redis.Z{Score: 2, Member: "b"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zsetb", "zseta").Val().Z
+		require.Equal(t, redis.Z{Score: 1, Member: "d"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zsetb", "zseta").Val().Z
+		require.Equal(t, redis.Z{Score: 2, Member: "e"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
+		require.Equal(t, redis.Z{Score: 3, Member: "c"}, resultz)
+		var err = rdb.BZPopMin(ctx, time.Millisecond*1000, "zseta", "zsetb").Err()
+		require.Equal(t, redis.Nil, err)
+
+		rd := srv.NewTCPClient()
+		defer func() { require.NoError(t, rd.Close()) }()
+		require.NoError(t, rd.WriteArgs("bzpopmin", "zseta", "0"))
+		time.Sleep(time.Millisecond * 1000)

Review Comment:
   I think we need to wait for the client `rd` actually blocked. 
   The code has two client, if `rdb` add the key-value before `rd` actually blocked, it will not test out whether the key-value is poped after another client add.
   Other go tests for command with block also have the same `sleep` operation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1226788186


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        count_ = *parse_result;
+      }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    } else {
+      auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
       if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        return {Status::RedisParseErr, "timeout is not an integer or out of range"};
+      }
+
+      if (*parse_result < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};
       }
 
-      count_ = *parse_result;
+      timeout_ = *parse_result;
+
+      keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
+      return Commander::Parse(args);
     }
-    return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
-    if (!s.ok()) {
-      return {Status::RedisExecErr, s.ToString()};
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
     }
 
-    output->append(redis::MultiLen(member_scores.size() * 2));
-    for (const auto &ms : member_scores) {
-      output->append(redis::BulkString(ms.member));
-      output->append(redis::BulkString(util::Float2String(ms.score)));
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already output
     }
 
-    return Status::OK();
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {

Review Comment:
   I can split the func `TryPopFromZset`, but it is a little difficult or tedious to merge `BZMPOP` and `BZPOPMIN/MAX` together. The ways they parse arguments and reply messages are so different. The only same thing for them in the func `TryPopFromZset` is they pop from the database, although the name are same, maybe it's a bad name. :(



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#issuecomment-1595778947

   @torwig Is there anything i need to modify? :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1233107374


##########
tests/gocase/unit/type/zset/zset_test.go:
##########
@@ -318,6 +318,62 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, encoding s
 		require.Equal(t, []redis.Z{{Score: 10, Member: "a"}}, rdb.ZPopMax(ctx, "ztmp", 3).Val())
 	})
 
+	t.Run(fmt.Sprintf("BZPOPMIN basics - %s", encoding), func(t *testing.T) {
+		rdb.Del(ctx, "zseta")
+		rdb.Del(ctx, "zsetb")
+		rdb.ZAdd(ctx, "zseta", redis.Z{Score: 1, Member: "a"}, redis.Z{Score: 2, Member: "b"}, redis.Z{Score: 3, Member: "c"})
+		rdb.ZAdd(ctx, "zsetb", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"})
+		require.EqualValues(t, 3, rdb.ZCard(ctx, "zseta").Val())
+		require.EqualValues(t, 2, rdb.ZCard(ctx, "zsetb").Val())
+		resultz := rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
+		require.Equal(t, redis.Z{Score: 1, Member: "a"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
+		require.Equal(t, redis.Z{Score: 2, Member: "b"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zsetb", "zseta").Val().Z
+		require.Equal(t, redis.Z{Score: 1, Member: "d"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zsetb", "zseta").Val().Z
+		require.Equal(t, redis.Z{Score: 2, Member: "e"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
+		require.Equal(t, redis.Z{Score: 3, Member: "c"}, resultz)
+		var err = rdb.BZPopMin(ctx, time.Millisecond*1000, "zseta", "zsetb").Err()
+		require.Equal(t, redis.Nil, err)
+
+		rd := srv.NewTCPClient()
+		defer func() { require.NoError(t, rd.Close()) }()
+		require.NoError(t, rd.WriteArgs("bzpopmin", "zseta", "0"))
+		time.Sleep(time.Millisecond * 1000)

Review Comment:
   I thought the line `require.NoError(t, rd.WriteArgs("bzpopmin", "zseta", "0"))` is blocking by itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1232080274


##########
src/commands/cmd_zset.cc:
##########
@@ -221,64 +223,217 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);

Review Comment:
   Ok, i'll split zpopmin/max, do i also need to split zmpop and bzmpop? 
   I think there are many same codes in zmpop and bzmpop, so the two commands can merge together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1232088462


##########
src/commands/cmd_zset.cc:
##########
@@ -221,64 +223,217 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);

Review Comment:
   @Yangsx-1 You can move the same code into functions (and call them from both commands) but still have two different commands.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1229576455


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        count_ = *parse_result;
+      }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    } else {
+      auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
       if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        return {Status::RedisParseErr, "timeout is not an integer or out of range"};
+      }
+
+      if (*parse_result < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};
       }
 
-      count_ = *parse_result;
+      timeout_ = *parse_result;
+
+      keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
+      return Commander::Parse(args);
     }
-    return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
-    if (!s.ok()) {
-      return {Status::RedisExecErr, s.ToString()};
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
     }
 
-    output->append(redis::MultiLen(member_scores.size() * 2));
-    for (const auto &ms : member_scores) {
-      output->append(redis::BulkString(ms.member));
-      output->append(redis::BulkString(util::Float2String(ms.score)));
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already output
     }
 
-    return Status::OK();
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {

Review Comment:
   Is there anything i need to fix? :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1226797110


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        count_ = *parse_result;
+      }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    } else {
+      auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
       if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        return {Status::RedisParseErr, "timeout is not an integer or out of range"};
+      }
+
+      if (*parse_result < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};
       }
 
-      count_ = *parse_result;
+      timeout_ = *parse_result;
+
+      keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
+      return Commander::Parse(args);
     }
-    return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
-    if (!s.ok()) {
-      return {Status::RedisExecErr, s.ToString()};
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
     }
 
-    output->append(redis::MultiLen(member_scores.size() * 2));
-    for (const auto &ms : member_scores) {
-      output->append(redis::BulkString(ms.member));
-      output->append(redis::BulkString(util::Float2String(ms.score)));
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already output
     }
 
-    return Status::OK();
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {

Review Comment:
   @Yangsx-1 Yes, you can have different functions if the logic inside is different. But try to separate retrieving the data from the storage and sending the response - maybe it reveals some possibilities of deduplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] infdahai commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "infdahai (via GitHub)" <gi...@apache.org>.
infdahai commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1226973166


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        count_ = *parse_result;
+      }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    } else {
+      auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
       if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        return {Status::RedisParseErr, "timeout is not an integer or out of range"};
+      }
+
+      if (*parse_result < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};
       }
 
-      count_ = *parse_result;
+      timeout_ = *parse_result;
+
+      keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
+      return Commander::Parse(args);
     }
-    return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
-    if (!s.ok()) {
-      return {Status::RedisExecErr, s.ToString()};
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
     }
 
-    output->append(redis::MultiLen(member_scores.size() * 2));
-    for (const auto &ms : member_scores) {
-      output->append(redis::BulkString(ms.member));
-      output->append(redis::BulkString(util::Float2String(ms.score)));
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already output
     }
 
-    return Status::OK();
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {
+    redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
+    rocksdb::Status s;
+    for (auto &user_key : keys_) {
+      std::vector<MemberScore> member_scores;
+      s = zset_db.Pop(user_key, count_, min_, &member_scores);
+      if (!s.ok()) {
+        conn_->Reply(redis::Error("ERR " + s.ToString()));
+        break;
+      }
+      if (member_scores.empty() && block_) {
+        continue;
+      }
+      std::string output;
+      if (!block_) {
+        output.append(redis::MultiLen(member_scores.size() * 2));
+      } else {
+        output.append(redis::MultiLen(member_scores.size() * 2 + 1));
+        output.append(redis::BulkString(user_key));
+      }
+      for (const auto &ms : member_scores) {
+        output.append(redis::BulkString(ms.member));
+        output.append(redis::BulkString(util::Float2String(ms.score)));
+      }
+      conn_->Reply(output);
+      reply_flag_ = true;
+      break;
+    }
+    return s;
+  }
+
+  void OnWrite(bufferevent *bev) {
+    auto s = TryPopFromZset();
+    if (!reply_flag_) {
+      // The connection may be waked up but can't pop from list. For example,
+      // connection A is blocking on list and connection B push a new element
+      // then wake up the connection A, but this element may be token by other connection C.
+      // So we need to wait for the wake event again by disabling the WRITE event.
+      bufferevent_disable(bev, EV_WRITE);
+      return;
+    }
+
+    if (timer_) {
+      timer_.reset();
+    }
+
+    unBlockingAll();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
+    // We need to manually trigger the read event since we will stop processing commands
+    // in connection after the blocking command, so there may have some commands to be processed.
+    // Related issue: https://github.com/apache/incubator-kvrocks/issues/831
+    bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
+  }
+
+  void OnEvent(bufferevent *bev, int16_t events) {
+    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
+      if (timer_ != nullptr) {
+        timer_.reset();
+      }
+      unBlockingAll();
+    }
+    conn_->OnEvent(bev, events);
+  }
+
+  void TimerCB(int, int16_t events) {
+    conn_->Reply(redis::NilString());
+    timer_.reset();
+    unBlockingAll();
+    auto bev = conn_->GetBufferEvent();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
   }
 
  private:
   bool min_;
+  bool block_;
   int count_ = 1;
+  int timeout_;
+  std::vector<std::string> keys_;
+  Server *svr_ = nullptr;
+  Connection *conn_ = nullptr;
+  UniqueEvent timer_;
+  bool reply_flag_ = false;
+
+  void unBlockingAll() {
+    for (const auto &key : keys_) {
+      svr_->UnblockOnKey(key, conn_);
+    }
+  }
 };
 
 class CommandZPopMin : public CommandZPop {
  public:
-  CommandZPopMin() : CommandZPop(true) {}
+  CommandZPopMin() : CommandZPop(true, false) {}
 };
 
 class CommandZPopMax : public CommandZPop {
  public:
-  CommandZPopMax() : CommandZPop(false) {}
+  CommandZPopMax() : CommandZPop(false, false) {}
 };
 
-class CommandZMPop : public Commander {
+class CommandBZPopMin : public CommandZPop {
  public:
-  CommandZMPop() = default;
+  CommandBZPopMin() : CommandZPop(true, true) {}
+};
+
+class CommandBZPopMax : public CommandZPop {
+ public:
+  CommandBZPopMax() : CommandZPop(false, true) {}
+};
+
+class CommandMPop : public Commander,
+                    private EvbufCallbackBase<CommandMPop, false>,
+                    private EventCallbackBase<CommandMPop> {
+ public:
+  explicit CommandMPop(bool block) : block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
     CommandParser parser(args, 1);
+    if (block_) {
+      timeout_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{0, std::numeric_limits<int>::max()}));
+      if (timeout_ < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};

Review Comment:
   You can use `errTimeoutIsNegative` in error_constants.h



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#issuecomment-1586072581

   > @Yangsx-1 You can use the `./x.py format` command before committing the code to avoid errors during CI.
   
   Thanks, fixed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#issuecomment-1586074933

   ![1686471961296](https://github.com/apache/incubator-kvrocks/assets/85615957/e64d70b4-d316-40a0-97e3-3199bed7f139)
   It seems like other things wrong? And it didn't happen in my forked repo's [workflow](https://github.com/Yangsx-1/incubator-kvrocks/actions/runs/5234167251).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1230974595


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
+    svr_ = svr;
+    conn_ = conn;
+
+    auto s = TryPopFromMultiZset();

Review Comment:
   Could you simplify the logic like the following:
   
   ```
   redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
   
   for (auto &key : keys_) {
     std::vector<MemberScore> member_scores;
     auto s = zset_db.Pop(key, count_, flag_ == ZSET_MIN, &member_scores);
     if (!s.ok()) {
       return {Status::RedisExecErr, s.ToString()};
     }
   
    if (!member_scores.empty()) {
       ReplyWithMemberScores(member_scores); // construct output and send it to conn_
       return Status::OK();
     }
   }
   
   // no elements found in all keys
   if (!block_ || conn->IsInExec()) {
       // reply with nil array
       return Status::OK();
   }
   
   for (const auto &key : keys_) {
     svr_->BlockOnKey(key, conn_);
   }
       
    // the rest of the `Execute` code with `bev` and setting a timer.
   ```



##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}

Review Comment:
   BTW, did you add also `BZPOPMAX` and `BZPOPMIN`?



##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
+    svr_ = svr;
+    conn_ = conn;
+
+    auto s = TryPopFromMultiZset();
     if (!s.ok()) {
-      return {Status::RedisExecErr, s.ToString()};
+      return Status::OK();  // Error already output
     }
 
-    output->append(redis::MultiLen(member_scores.size() * 2));
-    for (const auto &ms : member_scores) {
-      output->append(redis::BulkString(ms.member));
-      output->append(redis::BulkString(util::Float2String(ms.score)));
+    if (!block_) {  // ReplyForZPop
+      output->append(redis::MultiLen(member_scores_.size() * 2));
+      for (const auto &ms : member_scores_) {
+        output->append(redis::BulkString(ms.member));
+        output->append(redis::BulkString(util::Float2String(ms.score)));
+      }
+      return Status::OK();
     }
 
-    return Status::OK();
+    if (!member_scores_.empty()) {
+      ReplyForBZPop();
+      return Status::OK();
+    }
+
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    auto bev = conn->GetBufferEvent();
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromMultiZset() {
+    redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
+    rocksdb::Status s;
+    for (auto &user_key : keys_) {
+      s = zset_db.Pop(user_key, count_, min_, &member_scores_);
+      if (!s.ok()) {
+        // output here is necessary for block operation to reply error
+        conn_->Reply(redis::Error("ERR " + s.ToString()));
+        break;
+      }
+      if (member_scores_.empty() && block_) {
+        continue;
+      }
+      user_key_ = user_key;
+      break;
+    }
+    return s;
+  }
+
+  void ReplyForBZPop() {
+    std::string output;
+    output.append(redis::MultiLen(member_scores_.size() * 2 + 1));
+    output.append(redis::BulkString(user_key_));
+    for (const auto &ms : member_scores_) {
+      output.append(redis::BulkString(ms.member));
+      output.append(redis::BulkString(util::Float2String(ms.score)));
+    }
+    conn_->Reply(output);
+  }
+
+  void OnWrite(bufferevent *bev) {
+    auto s = TryPopFromMultiZset();
+    if (member_scores_.empty()) {
+      // The connection may be waked up but can't pop from list. For example,
+      // connection A is blocking on list and connection B push a new element
+      // then wake up the connection A, but this element may be token by other connection C.
+      // So we need to wait for the wake event again by disabling the WRITE event.
+      bufferevent_disable(bev, EV_WRITE);
+      return;
+    }
+    ReplyForBZPop();
+
+    if (timer_) {
+      timer_.reset();
+    }
+
+    unBlockingAll();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
+    // We need to manually trigger the read event since we will stop processing commands
+    // in connection after the blocking command, so there may have some commands to be processed.
+    // Related issue: https://github.com/apache/incubator-kvrocks/issues/831
+    bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
+  }
+
+  void OnEvent(bufferevent *bev, int16_t events) {
+    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
+      if (timer_ != nullptr) {
+        timer_.reset();
+      }
+      unBlockingAll();
+    }
+    conn_->OnEvent(bev, events);
+  }
+
+  void TimerCB(int, int16_t events) {
+    conn_->Reply(redis::NilString());
+    timer_.reset();
+    unBlockingAll();
+    auto bev = conn_->GetBufferEvent();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
   }
 
  private:
   bool min_;
+  bool block_;
   int count_ = 1;
+  int timeout_;
+  std::vector<std::string> keys_;
+  Server *svr_ = nullptr;
+  Connection *conn_ = nullptr;
+  UniqueEvent timer_;
+  std::string user_key_;

Review Comment:
   And you can delete `user_key_` and `member_scores_` as the command class members.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#issuecomment-1595819803

   @Yangsx-1 I made a PR with my changes to your branch. You can approve it and merge so they should be visible here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] git-hulk commented on pull request #1490: Add the support of the BZMPOP command

Posted by "git-hulk (via GitHub)" <gi...@apache.org>.
git-hulk commented on PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#issuecomment-1596197691

   Thanks all, merging...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1231254977


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
+    svr_ = svr;
+    conn_ = conn;
+
+    auto s = TryPopFromMultiZset();

Review Comment:
   The function `TryPopFromMultiZset()` is also used in function `OnWrite` later, i think expanding the code here rather than using a function will cause duplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#issuecomment-1586075896

   > Thanks for your contribution!
   > 
   > I notice there are some duplicated code segments, could you try to refactor to avoid these trivially repeated code?
   
   OK, i'll try, but it may need some time, i'm a little busy these days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#issuecomment-1586006275

   @Yangsx-1 You can use the `./x.py format` command before committing the code to avoid errors during CI.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1229777929


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        count_ = *parse_result;
+      }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    } else {
+      auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
       if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        return {Status::RedisParseErr, "timeout is not an integer or out of range"};
+      }
+
+      if (*parse_result < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};
       }
 
-      count_ = *parse_result;
+      timeout_ = *parse_result;
+
+      keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
+      return Commander::Parse(args);
     }
-    return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
-    if (!s.ok()) {
-      return {Status::RedisExecErr, s.ToString()};
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
     }
 
-    output->append(redis::MultiLen(member_scores.size() * 2));
-    for (const auto &ms : member_scores) {
-      output->append(redis::BulkString(ms.member));
-      output->append(redis::BulkString(util::Float2String(ms.score)));
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already output
     }
 
-    return Status::OK();
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {

Review Comment:
   @Yangsx-1 Thank you for the changes you've made. I'll review them later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1233109316


##########
tests/gocase/unit/type/zset/zset_test.go:
##########
@@ -318,6 +318,62 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, encoding s
 		require.Equal(t, []redis.Z{{Score: 10, Member: "a"}}, rdb.ZPopMax(ctx, "ztmp", 3).Val())
 	})
 
+	t.Run(fmt.Sprintf("BZPOPMIN basics - %s", encoding), func(t *testing.T) {
+		rdb.Del(ctx, "zseta")
+		rdb.Del(ctx, "zsetb")
+		rdb.ZAdd(ctx, "zseta", redis.Z{Score: 1, Member: "a"}, redis.Z{Score: 2, Member: "b"}, redis.Z{Score: 3, Member: "c"})
+		rdb.ZAdd(ctx, "zsetb", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"})
+		require.EqualValues(t, 3, rdb.ZCard(ctx, "zseta").Val())
+		require.EqualValues(t, 2, rdb.ZCard(ctx, "zsetb").Val())
+		resultz := rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
+		require.Equal(t, redis.Z{Score: 1, Member: "a"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
+		require.Equal(t, redis.Z{Score: 2, Member: "b"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zsetb", "zseta").Val().Z
+		require.Equal(t, redis.Z{Score: 1, Member: "d"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zsetb", "zseta").Val().Z
+		require.Equal(t, redis.Z{Score: 2, Member: "e"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
+		require.Equal(t, redis.Z{Score: 3, Member: "c"}, resultz)
+		var err = rdb.BZPopMin(ctx, time.Millisecond*1000, "zseta", "zsetb").Err()
+		require.Equal(t, redis.Nil, err)
+
+		rd := srv.NewTCPClient()
+		defer func() { require.NoError(t, rd.Close()) }()
+		require.NoError(t, rd.WriteArgs("bzpopmin", "zseta", "0"))
+		time.Sleep(time.Millisecond * 1000)

Review Comment:
   You are right. I test it without sleep. There is no need to sleep here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] git-hulk merged pull request #1490: Add the support of the BZMPOP command

Posted by "git-hulk (via GitHub)" <gi...@apache.org>.
git-hulk merged PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1226462174


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        count_ = *parse_result;
+      }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    } else {
+      auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
       if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        return {Status::RedisParseErr, "timeout is not an integer or out of range"};
+      }
+
+      if (*parse_result < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};
       }
 
-      count_ = *parse_result;
+      timeout_ = *parse_result;
+
+      keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
+      return Commander::Parse(args);
     }
-    return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
-    if (!s.ok()) {
-      return {Status::RedisExecErr, s.ToString()};
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
     }
 
-    output->append(redis::MultiLen(member_scores.size() * 2));
-    for (const auto &ms : member_scores) {
-      output->append(redis::BulkString(ms.member));
-      output->append(redis::BulkString(util::Float2String(ms.score)));
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already output
     }
 
-    return Status::OK();
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {
+    redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
+    rocksdb::Status s;
+    for (auto &user_key : keys_) {
+      std::vector<MemberScore> member_scores;
+      s = zset_db.Pop(user_key, count_, min_, &member_scores);
+      if (!s.ok()) {
+        conn_->Reply(redis::Error("ERR " + s.ToString()));
+        break;
+      }
+      if (member_scores.empty() && block_) {
+        continue;
+      }
+      std::string output;
+      if (!block_) {
+        output.append(redis::MultiLen(member_scores.size() * 2));
+      } else {
+        output.append(redis::MultiLen(member_scores.size() * 2 + 1));
+        output.append(redis::BulkString(user_key));
+      }
+      for (const auto &ms : member_scores) {
+        output.append(redis::BulkString(ms.member));
+        output.append(redis::BulkString(util::Float2String(ms.score)));
+      }
+      conn_->Reply(output);
+      reply_flag_ = true;
+      break;
+    }
+    return s;
+  }
+
+  void OnWrite(bufferevent *bev) {
+    auto s = TryPopFromZset();
+    if (!reply_flag_) {
+      // The connection may be waked up but can't pop from list. For example,
+      // connection A is blocking on list and connection B push a new element
+      // then wake up the connection A, but this element may be token by other connection C.
+      // So we need to wait for the wake event again by disabling the WRITE event.
+      bufferevent_disable(bev, EV_WRITE);
+      return;
+    }
+
+    if (timer_) {
+      timer_.reset();
+    }
+
+    unBlockingAll();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
+    // We need to manually trigger the read event since we will stop processing commands
+    // in connection after the blocking command, so there may have some commands to be processed.
+    // Related issue: https://github.com/apache/incubator-kvrocks/issues/831
+    bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
+  }
+
+  void OnEvent(bufferevent *bev, int16_t events) {
+    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
+      if (timer_ != nullptr) {
+        timer_.reset();
+      }
+      unBlockingAll();
+    }
+    conn_->OnEvent(bev, events);
+  }
+
+  void TimerCB(int, int16_t events) {
+    conn_->Reply(redis::NilString());
+    timer_.reset();
+    unBlockingAll();
+    auto bev = conn_->GetBufferEvent();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
   }
 
  private:
   bool min_;
+  bool block_;
   int count_ = 1;
+  int timeout_;
+  std::vector<std::string> keys_;
+  Server *svr_ = nullptr;
+  Connection *conn_ = nullptr;
+  UniqueEvent timer_;
+  bool reply_flag_ = false;
+
+  void unBlockingAll() {
+    for (const auto &key : keys_) {
+      svr_->UnblockOnKey(key, conn_);
+    }
+  }
 };
 
 class CommandZPopMin : public CommandZPop {
  public:
-  CommandZPopMin() : CommandZPop(true) {}
+  CommandZPopMin() : CommandZPop(true, false) {}
 };
 
 class CommandZPopMax : public CommandZPop {
  public:
-  CommandZPopMax() : CommandZPop(false) {}
+  CommandZPopMax() : CommandZPop(false, false) {}
 };
 
-class CommandZMPop : public Commander {
+class CommandBZPopMin : public CommandZPop {
  public:
-  CommandZMPop() = default;
+  CommandBZPopMin() : CommandZPop(true, true) {}
+};
+
+class CommandBZPopMax : public CommandZPop {
+ public:
+  CommandBZPopMax() : CommandZPop(false, true) {}
+};
+
+class CommandMPop : public Commander,
+                    private EvbufCallbackBase<CommandMPop, false>,
+                    private EventCallbackBase<CommandMPop> {
+ public:
+  explicit CommandMPop(bool block) : block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
     CommandParser parser(args, 1);
+    if (block_) {
+      timeout_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{0, std::numeric_limits<int>::max()}));
+      if (timeout_ < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};

Review Comment:
   It's not critical, but in this case `Redis` returns `timeout is negative`. 
   I prefer to launch `redis-server` and `redis-cli` locally to test such cases and see the exact error messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1233086245


##########
tests/gocase/unit/type/zset/zset_test.go:
##########
@@ -318,6 +318,62 @@ func basicTests(t *testing.T, rdb *redis.Client, ctx context.Context, encoding s
 		require.Equal(t, []redis.Z{{Score: 10, Member: "a"}}, rdb.ZPopMax(ctx, "ztmp", 3).Val())
 	})
 
+	t.Run(fmt.Sprintf("BZPOPMIN basics - %s", encoding), func(t *testing.T) {
+		rdb.Del(ctx, "zseta")
+		rdb.Del(ctx, "zsetb")
+		rdb.ZAdd(ctx, "zseta", redis.Z{Score: 1, Member: "a"}, redis.Z{Score: 2, Member: "b"}, redis.Z{Score: 3, Member: "c"})
+		rdb.ZAdd(ctx, "zsetb", redis.Z{Score: 1, Member: "d"}, redis.Z{Score: 2, Member: "e"})
+		require.EqualValues(t, 3, rdb.ZCard(ctx, "zseta").Val())
+		require.EqualValues(t, 2, rdb.ZCard(ctx, "zsetb").Val())
+		resultz := rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
+		require.Equal(t, redis.Z{Score: 1, Member: "a"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
+		require.Equal(t, redis.Z{Score: 2, Member: "b"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zsetb", "zseta").Val().Z
+		require.Equal(t, redis.Z{Score: 1, Member: "d"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zsetb", "zseta").Val().Z
+		require.Equal(t, redis.Z{Score: 2, Member: "e"}, resultz)
+		resultz = rdb.BZPopMin(ctx, 0, "zseta", "zsetb").Val().Z
+		require.Equal(t, redis.Z{Score: 3, Member: "c"}, resultz)
+		var err = rdb.BZPopMin(ctx, time.Millisecond*1000, "zseta", "zsetb").Err()
+		require.Equal(t, redis.Nil, err)
+
+		rd := srv.NewTCPClient()
+		defer func() { require.NoError(t, rd.Close()) }()
+		require.NoError(t, rd.WriteArgs("bzpopmin", "zseta", "0"))
+		time.Sleep(time.Millisecond * 1000)

Review Comment:
   @Yangsx-1 Why do we need the call to `Sleep` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1226686389


##########
src/commands/cmd_zset.cc:
##########
@@ -301,40 +450,149 @@ class CommandZMPop : public Commander {
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
+    }
+
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already output
+    }
+
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {
+    redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
+    rocksdb::Status s;
+    std::string output;
     for (auto &user_key : keys_) {
       std::vector<MemberScore> member_scores;
-      auto s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores);
+      s = zset_db.Pop(user_key, count_, flag_ == ZSET_MIN, &member_scores);
       if (!s.ok()) {
-        return {Status::RedisExecErr, s.ToString()};
+        conn_->Reply(redis::Error("ERR " + s.ToString()));
+        break;
       }
       if (member_scores.empty()) {
         continue;
       }
-
-      output->append(redis::MultiLen(2));
-      output->append(redis::BulkString(user_key));
-      output->append(redis::MultiLen(member_scores.size() * 2));
+      output.append(redis::MultiLen(2));
+      output.append(redis::BulkString(user_key));
+      output.append(redis::MultiLen(member_scores.size() * 2));
       for (const auto &ms : member_scores) {
-        output->append(redis::BulkString(ms.member));
-        output->append(redis::BulkString(util::Float2String(ms.score)));
+        output.append(redis::BulkString(ms.member));
+        output.append(redis::BulkString(util::Float2String(ms.score)));
       }
-      return Status::OK();
+      reply_flag_ = true;
+      break;
     }
-    *output = redis::NilString();
-    return Status::OK();
+    if (output.empty() && !block_) {
+      output = redis::NilString();
+    }
+    conn_->Reply(output);
+    return s;
+  }
+
+  void OnWrite(bufferevent *bev) {
+    auto s = TryPopFromZset();
+    if (!s.ok() || !reply_flag_) {

Review Comment:
   Yes, you are right. There shouldn't be a `!s.ok()`. I forgot to delete it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1231263442


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
+    svr_ = svr;
+    conn_ = conn;
+
+    auto s = TryPopFromMultiZset();

Review Comment:
   There may be some complexity because i merged zpopmin/zpopmax/bzpopmin/bzpopmax commands together.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1231305839


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
+    svr_ = svr;
+    conn_ = conn;
+
+    auto s = TryPopFromMultiZset();

Review Comment:
   I just refer to BLPOP/BRPOP in file cmd_list.cc, so i used the Try prefix. Is there a better way to name it?
   The reply of BZPOPMIN/BZPOPMAX has an additional 'key' compared to ZPOPMIN/ZPOPMAX. I think i can merge them together so that they can use the same function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] torwig commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1231314270


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
+    svr_ = svr;
+    conn_ = conn;
+
+    auto s = TryPopFromMultiZset();

Review Comment:
   @Yangsx-1 Now I see what "inspired" you :) 
   You can name something like `TryPopFromMultiZset` -> `PopFromMultipleZsets`, `ReplyForBZPop` -> `SendMembersWithScores`. It would be great if you don't send an error response inside the `PopFromMultipleZsets` function. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-kvrocks] Yangsx-1 commented on a diff in pull request #1490: Add the support of the BZMPOP command

Posted by "Yangsx-1 (via GitHub)" <gi...@apache.org>.
Yangsx-1 commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1231282807


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,218 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
-      if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        count_ = *parse_result;
       }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    }
+    auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-      count_ = *parse_result;
+    if (!parse_result) {
+      return {Status::RedisParseErr, "timeout is not an integer or out of range"};
     }
+
+    if (*parse_result < 0) {
+      return {Status::RedisParseErr, errTimeoutIsNegative};
+    }
+
+    timeout_ = *parse_result;
+
+    keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
     return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
+    svr_ = svr;
+    conn_ = conn;
+
+    auto s = TryPopFromMultiZset();

Review Comment:
   Ok, i understand your opinion. I'll revise it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org