You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by hu...@apache.org on 2022/09/09 02:14:14 UTC

[incubator-kvrocks] branch unstable updated: Fix successor commands won't be processed before receiving the next read event (#839)

This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 1469746  Fix successor commands won't be processed before receiving the next read event (#839)
1469746 is described below

commit 1469746ddc9105faa4054ea95d9af61c9dce5fd9
Author: hulk <hu...@gmail.com>
AuthorDate: Fri Sep 9 10:14:08 2022 +0800

    Fix successor commands won't be processed before receiving the next read event (#839)
    
    Currently, we will stop processing commands when running into block commands
    like BRPOP/BLPOP and there remained commands in the connection, but the connection
    continues handling those commands since the read event was triggered.
---
 src/redis_cmd.cc                              | 15 ++++++++++-----
 tests/gocase/unit/protocol/regression_test.go | 11 +++++------
 2 files changed, 15 insertions(+), 11 deletions(-)

diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc
index 52c381d..16b00e9 100644
--- a/src/redis_cmd.cc
+++ b/src/redis_cmd.cc
@@ -1635,12 +1635,13 @@ class CommandBPop : public Commander {
   static void WriteCB(bufferevent *bev, void *ctx) {
     auto self = reinterpret_cast<CommandBPop *>(ctx);
     auto s = self->TryPopFromList();
-    // if pop fail ,currently we compromised to close bpop request
     if (s.IsNotFound()) {
-      self->conn_->Reply(Redis::NilString());
-      LOG(ERROR) << "[BPOP] Failed to execute redis command: " << self->conn_->current_cmd_->GetAttributes()->name
-                 << ", err: another concurrent pop request must have stole the data before this bpop request"
-                 << " or bpop is in a pipeline cmd list(cmd before bpop replyed trigger this writecb)";
+      // The connection may be waked up but can't pop from list. For example,
+      // connection A is blocking on list and connection B push a new element
+      // then wake up the connection A, but this element may be token by other connection C.
+      // So we need to wait for the wake event again by disabling the WRITE event.
+      bufferevent_disable(bev, EV_WRITE);
+      return;
     }
     if (self->timer_ != nullptr) {
       event_free(self->timer_);
@@ -1650,6 +1651,10 @@ class CommandBPop : public Commander {
     bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite,
                       Redis::Connection::OnEvent, self->conn_);
     bufferevent_enable(bev, EV_READ);
+    // We need to manually trigger the read event since we will stop processing commands
+    // in connection after the blocking command, so there may have some commands to be processed.
+    // Related issue: https://github.com/apache/incubator-kvrocks/issues/831
+    bufferevent_trigger(bev, EV_READ, BEV_TRIG_IGNORE_WATERMARKS);
   }
 
   static void EventCB(bufferevent *bev, int16_t events, void *ctx) {
diff --git a/tests/gocase/unit/protocol/regression_test.go b/tests/gocase/unit/protocol/regression_test.go
index e5bb779..4a6cfc5 100644
--- a/tests/gocase/unit/protocol/regression_test.go
+++ b/tests/gocase/unit/protocol/regression_test.go
@@ -55,10 +55,9 @@ func TestRegression(t *testing.T) {
 	v = rdb.RPush(ctx, "handle", "a")
 	require.EqualValues(t, 1, v.Val())
 
-	// TODO should read the second pushed element
-	//for _, res := range resList {
-	//	r, err := c.ReadLine()
-	//	require.NoError(t, err)
-	//	require.Equal(t, res, r)
-	//}
+	for _, res := range resList {
+		r, err := c.ReadLine()
+		require.NoError(t, err)
+		require.Equal(t, res, r)
+	}
 }