You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by hu...@apache.org on 2022/09/09 02:14:14 UTC
[incubator-kvrocks] branch unstable updated: Fix successor commands won't be processed before receiving the next read event (#839)
This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 1469746 Fix successor commands won't be processed before receiving the next read event (#839)
1469746 is described below
commit 1469746ddc9105faa4054ea95d9af61c9dce5fd9
Author: hulk <hu...@gmail.com>
AuthorDate: Fri Sep 9 10:14:08 2022 +0800
Fix successor commands won't be processed before receiving the next read event (#839)
Currently, we will stop processing commands when running into block commands
like BRPOP/BLPOP and there remained commands in the connection, but the connection
continues handling those commands since the read event was triggered.
---
src/redis_cmd.cc | 15 ++++++++++-----
tests/gocase/unit/protocol/regression_test.go | 11 +++++------
2 files changed, 15 insertions(+), 11 deletions(-)
diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc
index 52c381d..16b00e9 100644
--- a/src/redis_cmd.cc
+++ b/src/redis_cmd.cc
@@ -1635,12 +1635,13 @@ class CommandBPop : public Commander {
static void WriteCB(bufferevent *bev, void *ctx) {
auto self = reinterpret_cast<CommandBPop *>(ctx);
auto s = self->TryPopFromList();
- // if pop fail ,currently we compromised to close bpop request
if (s.IsNotFound()) {
- self->conn_->Reply(Redis::NilString());
- LOG(ERROR) << "[BPOP] Failed to execute redis command: " << self->conn_->current_cmd_->GetAttributes()->name
- << ", err: another concurrent pop request must have stole the data before this bpop request"
- << " or bpop is in a pipeline cmd list(cmd before bpop replyed trigger this writecb)";
+ // The connection may be waked up but can't pop from list. For example,
+ // connection A is blocking on list and connection B push a new element
+ // then wake up the connection A, but this element may be token by other connection C.
+ // So we need to wait for the wake event again by disabling the WRITE event.
+ bufferevent_disable(bev, EV_WRITE);
+ return;
}
if (self->timer_ != nullptr) {
event_free(self->timer_);
@@ -1650,6 +1651,10 @@ class CommandBPop : public Commander {
bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite,
Redis::Connection::OnEvent, self->conn_);
bufferevent_enable(bev, EV_READ);
+ // We need to manually trigger the read event since we will stop processing commands
+ // in connection after the blocking command, so there may have some commands to be processed.
+ // Related issue: https://github.com/apache/incubator-kvrocks/issues/831
+ bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
}
static void EventCB(bufferevent *bev, int16_t events, void *ctx) {
diff --git a/tests/gocase/unit/protocol/regression_test.go b/tests/gocase/unit/protocol/regression_test.go
index e5bb779..4a6cfc5 100644
--- a/tests/gocase/unit/protocol/regression_test.go
+++ b/tests/gocase/unit/protocol/regression_test.go
@@ -55,10 +55,9 @@ func TestRegression(t *testing.T) {
v = rdb.RPush(ctx, "handle", "a")
require.EqualValues(t, 1, v.Val())
- // TODO should read the second pushed element
- //for _, res := range resList {
- // r, err := c.ReadLine()
- // require.NoError(t, err)
- // require.Equal(t, res, r)
- //}
+ for _, res := range resList {
+ r, err := c.ReadLine()
+ require.NoError(t, err)
+ require.Equal(t, res, r)
+ }
}