You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kvrocks.apache.org by "infdahai (via GitHub)" <gi...@apache.org> on 2023/06/12 17:17:38 UTC

[GitHub] [incubator-kvrocks] infdahai commented on a diff in pull request #1490: Add the support of the BZMPOP command

infdahai commented on code in PR #1490:
URL: https://github.com/apache/incubator-kvrocks/pull/1490#discussion_r1226973166


##########
src/commands/cmd_zset.cc:
##########
@@ -220,64 +222,211 @@ class CommandZLexCount : public Commander {
   RangeLexSpec spec_;
 };
 
-class CommandZPop : public Commander {
+class CommandZPop : public Commander,
+                    private EvbufCallbackBase<CommandZPop, false>,
+                    private EventCallbackBase<CommandZPop> {
  public:
-  explicit CommandZPop(bool min) : min_(min) {}
+  explicit CommandZPop(bool min, bool block) : min_(min), block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
-    if (args.size() > 3) {
-      return {Status::RedisParseErr, errWrongNumOfArguments};
-    }
+    if (!block_) {
+      if (args.size() > 3) {
+        return {Status::RedisParseErr, errWrongNumOfArguments};
+      }
+
+      if (args.size() == 3) {
+        auto parse_result = ParseInt<int>(args[2], 10);
+        if (!parse_result) {
+          return {Status::RedisParseErr, errValueNotInteger};
+        }
+
+        count_ = *parse_result;
+      }
+      keys_.push_back(args[1]);
+      return Commander::Parse(args);
+    } else {
+      auto parse_result = ParseInt<int>(args[args.size() - 1], 10);
 
-    if (args.size() == 3) {
-      auto parse_result = ParseInt<int>(args[2], 10);
       if (!parse_result) {
-        return {Status::RedisParseErr, errValueNotInteger};
+        return {Status::RedisParseErr, "timeout is not an integer or out of range"};
+      }
+
+      if (*parse_result < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};
       }
 
-      count_ = *parse_result;
+      timeout_ = *parse_result;
+
+      keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1);
+      return Commander::Parse(args);
     }
-    return Commander::Parse(args);
   }
 
   Status Execute(Server *svr, Connection *conn, std::string *output) override {
-    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
-    std::vector<MemberScore> member_scores;
-    auto s = zset_db.Pop(args_[1], count_, min_, &member_scores);
-    if (!s.ok()) {
-      return {Status::RedisExecErr, s.ToString()};
+    svr_ = svr;
+    conn_ = conn;
+
+    if (!block_) {
+      auto s = TryPopFromZset();
+      return Status::OK();
     }
 
-    output->append(redis::MultiLen(member_scores.size() * 2));
-    for (const auto &ms : member_scores) {
-      output->append(redis::BulkString(ms.member));
-      output->append(redis::BulkString(util::Float2String(ms.score)));
+    auto bev = conn->GetBufferEvent();
+
+    auto s = TryPopFromZset();
+
+    if (!s.ok() || reply_flag_) {
+      return Status::OK();  // error has already output or result has already output
     }
 
-    return Status::OK();
+    // All Empty
+    if (conn->IsInExec()) {
+      *output = redis::MultiLen(-1);
+      return Status::OK();  // No blocking in multi-exec
+    }
+
+    for (const auto &key : keys_) {
+      svr_->BlockOnKey(key, conn_);
+    }
+
+    SetCB(bev);
+
+    if (timeout_) {
+      timer_.reset(NewTimer(bufferevent_get_base(bev)));
+      timeval tm = {timeout_, 0};
+      evtimer_add(timer_.get(), &tm);
+    }
+
+    return {Status::BlockingCmd};
+  }
+
+  rocksdb::Status TryPopFromZset() {
+    redis::ZSet zset_db(svr_->storage, conn_->GetNamespace());
+    rocksdb::Status s;
+    for (auto &user_key : keys_) {
+      std::vector<MemberScore> member_scores;
+      s = zset_db.Pop(user_key, count_, min_, &member_scores);
+      if (!s.ok()) {
+        conn_->Reply(redis::Error("ERR " + s.ToString()));
+        break;
+      }
+      if (member_scores.empty() && block_) {
+        continue;
+      }
+      std::string output;
+      if (!block_) {
+        output.append(redis::MultiLen(member_scores.size() * 2));
+      } else {
+        output.append(redis::MultiLen(member_scores.size() * 2 + 1));
+        output.append(redis::BulkString(user_key));
+      }
+      for (const auto &ms : member_scores) {
+        output.append(redis::BulkString(ms.member));
+        output.append(redis::BulkString(util::Float2String(ms.score)));
+      }
+      conn_->Reply(output);
+      reply_flag_ = true;
+      break;
+    }
+    return s;
+  }
+
+  void OnWrite(bufferevent *bev) {
+    auto s = TryPopFromZset();
+    if (!reply_flag_) {
+      // The connection may be waked up but can't pop from list. For example,
+      // connection A is blocking on list and connection B push a new element
+      // then wake up the connection A, but this element may be token by other connection C.
+      // So we need to wait for the wake event again by disabling the WRITE event.
+      bufferevent_disable(bev, EV_WRITE);
+      return;
+    }
+
+    if (timer_) {
+      timer_.reset();
+    }
+
+    unBlockingAll();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
+    // We need to manually trigger the read event since we will stop processing commands
+    // in connection after the blocking command, so there may have some commands to be processed.
+    // Related issue: https://github.com/apache/incubator-kvrocks/issues/831
+    bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
+  }
+
+  void OnEvent(bufferevent *bev, int16_t events) {
+    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) {
+      if (timer_ != nullptr) {
+        timer_.reset();
+      }
+      unBlockingAll();
+    }
+    conn_->OnEvent(bev, events);
+  }
+
+  void TimerCB(int, int16_t events) {
+    conn_->Reply(redis::NilString());
+    timer_.reset();
+    unBlockingAll();
+    auto bev = conn_->GetBufferEvent();
+    conn_->SetCB(bev);
+    bufferevent_enable(bev, EV_READ);
   }
 
  private:
   bool min_;
+  bool block_;
   int count_ = 1;
+  int timeout_;
+  std::vector<std::string> keys_;
+  Server *svr_ = nullptr;
+  Connection *conn_ = nullptr;
+  UniqueEvent timer_;
+  bool reply_flag_ = false;
+
+  void unBlockingAll() {
+    for (const auto &key : keys_) {
+      svr_->UnblockOnKey(key, conn_);
+    }
+  }
 };
 
 class CommandZPopMin : public CommandZPop {
  public:
-  CommandZPopMin() : CommandZPop(true) {}
+  CommandZPopMin() : CommandZPop(true, false) {}
 };
 
 class CommandZPopMax : public CommandZPop {
  public:
-  CommandZPopMax() : CommandZPop(false) {}
+  CommandZPopMax() : CommandZPop(false, false) {}
 };
 
-class CommandZMPop : public Commander {
+class CommandBZPopMin : public CommandZPop {
  public:
-  CommandZMPop() = default;
+  CommandBZPopMin() : CommandZPop(true, true) {}
+};
+
+class CommandBZPopMax : public CommandZPop {
+ public:
+  CommandBZPopMax() : CommandZPop(false, true) {}
+};
+
+class CommandMPop : public Commander,
+                    private EvbufCallbackBase<CommandMPop, false>,
+                    private EventCallbackBase<CommandMPop> {
+ public:
+  explicit CommandMPop(bool block) : block_(block) {}
 
   Status Parse(const std::vector<std::string> &args) override {
     CommandParser parser(args, 1);
+    if (block_) {
+      timeout_ = GET_OR_RET(parser.TakeInt<int>(NumericRange<int>{0, std::numeric_limits<int>::max()}));
+      if (timeout_ < 0) {
+        return {Status::RedisParseErr, "timeout should not be negative"};

Review Comment:
   You can use `errTimeoutIsNegative` in error_constants.h



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org