You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kvrocks.apache.org by GitBox <gi...@apache.org> on 2022/08/09 14:46:05 UTC

[GitHub] [incubator-kvrocks] git-hulk commented on a diff in pull request #745: Add streams

git-hulk commented on code in PR #745:
URL: https://github.com/apache/incubator-kvrocks/pull/745#discussion_r941407994


##########
src/redis_cmd.cc:
##########
@@ -4750,6 +4756,883 @@ class CommandScript : public Commander {
   std::string subcommand_;
 };
 
+class CommandXAdd : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    bool entry_id_found = false;
+    stream_name_ = args[1];
+
+    for (size_t i = 2; i < args.size(); ) {
+      auto val = Util::ToLower(args[i]);
+
+      if (val == "nomkstream" && !entry_id_found) {
+        nomkstream_ = true;
+        ++i;
+        continue;
+      }
+
+      if (val == "maxlen" && !entry_id_found) {
+        if (i+1 >= args.size()) {
+          return Status(Status::RedisParseErr, errInvalidSyntax);
+        }
+
+        size_t max_len_idx;
+        bool eq_sign_found = false;
+        if (args[i+1] == "=") {
+          max_len_idx = i+2;
+          eq_sign_found = true;
+        } else {
+          max_len_idx = i+1;
+        }
+
+        if (max_len_idx >= args.size()) {
+          return Status(Status::RedisParseErr, errInvalidSyntax);
+        }
+
+        try {
+          max_len_ = std::stoull(args[max_len_idx]);
+          with_max_len_ = true;
+        } catch (const std::exception &) {
+          return Status(Status::RedisParseErr, errValueNotInterger);
+        }
+
+        i += eq_sign_found ? 3 : 2;
+        continue;
+      }
+
+      if (val == "minid" && !entry_id_found) {
+        if (i+1 >= args.size()) {
+          return Status(Status::RedisParseErr, errInvalidSyntax);
+        }
+
+        size_t min_id_idx;
+        bool eq_sign_found = false;
+        if (args[i+1] == "=") {
+          min_id_idx = i+2;
+          eq_sign_found = true;
+        } else {
+          min_id_idx = i+1;
+        }
+
+        if (min_id_idx >= args.size()) {
+          return Status(Status::RedisParseErr, errInvalidSyntax);
+        }
+
+        try {
+          auto s = ParseStreamEntryID(args[min_id_idx], &min_id_);
+          if (!s.IsOK()) {
+            return Status(Status::RedisParseErr, s.Msg());
+          }
+          with_min_id_ = true;
+        } catch (const std::exception &) {
+          return Status(Status::RedisParseErr, errValueNotInterger);
+        }
+
+        i += eq_sign_found ? 3 : 2;
+        continue;
+      }
+
+      if (val == "limit" && !entry_id_found) {
+        return Status(Status::RedisParseErr, errLimitOptionNotAllowed);
+      }
+
+      if (val == "*" && !entry_id_found) {
+        entry_id_found = true;
+        ++i;
+        continue;
+      } else if (!entry_id_found) {
+        auto s = ParseNewStreamEntryID(val, &entry_id_);
+        if (!s.IsOK()) {
+          return Status(Status::RedisParseErr, s.Msg());
+        }
+        entry_id_found = true;
+        with_entry_id_ = true;
+        ++i;
+        continue;
+      }
+
+      if (entry_id_found) {
+        name_value_pairs_.push_back(val);
+        ++i;
+      }
+    }
+
+    if (name_value_pairs_.empty() || name_value_pairs_.size() % 2 != 0) {
+      return Status(Status::RedisParseErr, errWrongNumOfArguments);
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    if (svr->IsSlave()) {

Review Comment:
   Can remove this check since we have already done it at: https://github.com/apache/incubator-kvrocks/blob/40132067bc7bb696f96675e8a396f0473321a454/src/redis_connection.cc#L407



##########
src/redis_cmd.cc:
##########
@@ -4750,6 +4756,883 @@ class CommandScript : public Commander {
   std::string subcommand_;
 };
 
+class CommandXAdd : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    bool entry_id_found = false;
+    stream_name_ = args[1];
+
+    for (size_t i = 2; i < args.size(); ) {
+      auto val = Util::ToLower(args[i]);
+
+      if (val == "nomkstream" && !entry_id_found) {
+        nomkstream_ = true;
+        ++i;
+        continue;
+      }
+
+      if (val == "maxlen" && !entry_id_found) {
+        if (i+1 >= args.size()) {
+          return Status(Status::RedisParseErr, errInvalidSyntax);
+        }
+
+        size_t max_len_idx;
+        bool eq_sign_found = false;
+        if (args[i+1] == "=") {
+          max_len_idx = i+2;
+          eq_sign_found = true;
+        } else {
+          max_len_idx = i+1;
+        }
+
+        if (max_len_idx >= args.size()) {
+          return Status(Status::RedisParseErr, errInvalidSyntax);
+        }
+
+        try {
+          max_len_ = std::stoull(args[max_len_idx]);
+          with_max_len_ = true;
+        } catch (const std::exception &) {
+          return Status(Status::RedisParseErr, errValueNotInterger);
+        }
+
+        i += eq_sign_found ? 3 : 2;
+        continue;
+      }
+
+      if (val == "minid" && !entry_id_found) {
+        if (i+1 >= args.size()) {
+          return Status(Status::RedisParseErr, errInvalidSyntax);
+        }
+
+        size_t min_id_idx;
+        bool eq_sign_found = false;
+        if (args[i+1] == "=") {
+          min_id_idx = i+2;
+          eq_sign_found = true;
+        } else {
+          min_id_idx = i+1;
+        }
+
+        if (min_id_idx >= args.size()) {
+          return Status(Status::RedisParseErr, errInvalidSyntax);
+        }
+
+        try {
+          auto s = ParseStreamEntryID(args[min_id_idx], &min_id_);
+          if (!s.IsOK()) {
+            return Status(Status::RedisParseErr, s.Msg());
+          }
+          with_min_id_ = true;
+        } catch (const std::exception &) {
+          return Status(Status::RedisParseErr, errValueNotInterger);
+        }
+
+        i += eq_sign_found ? 3 : 2;
+        continue;
+      }
+
+      if (val == "limit" && !entry_id_found) {
+        return Status(Status::RedisParseErr, errLimitOptionNotAllowed);
+      }
+
+      if (val == "*" && !entry_id_found) {
+        entry_id_found = true;
+        ++i;
+        continue;
+      } else if (!entry_id_found) {
+        auto s = ParseNewStreamEntryID(val, &entry_id_);
+        if (!s.IsOK()) {
+          return Status(Status::RedisParseErr, s.Msg());
+        }
+        entry_id_found = true;
+        with_entry_id_ = true;
+        ++i;
+        continue;
+      }
+
+      if (entry_id_found) {
+        name_value_pairs_.push_back(val);
+        ++i;
+      }
+    }
+
+    if (name_value_pairs_.empty() || name_value_pairs_.size() % 2 != 0) {
+      return Status(Status::RedisParseErr, errWrongNumOfArguments);
+    }
+
+    return Status::OK();
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    if (svr->IsSlave()) {
+      return Status(Status::NotOK, "READONLY You can't write against a read only replica");
+    }
+
+    Redis::StreamAddOptions options;
+    options.nomkstream = nomkstream_;
+    if (with_max_len_) {
+      options.trim_options.strategy = StreamTrimStrategy::MaxLen;
+      options.trim_options.max_len = max_len_;
+    }
+    if (with_min_id_) {
+      options.trim_options.strategy = StreamTrimStrategy::MinID;
+      options.trim_options.min_id = min_id_;
+    }
+    if (with_entry_id_) {
+      options.with_entry_id = true;
+      options.entry_id = entry_id_;
+    }
+
+    Redis::Stream stream_db(svr->storage_, conn->GetNamespace());
+    StreamEntryID entry_id;
+    auto s = stream_db.Add(stream_name_, options, name_value_pairs_, &entry_id);
+    if (!s.ok() && !s.IsNotFound()) {
+      return Status(Status::RedisExecErr, s.ToString());
+    }
+
+    if (s.IsNotFound() && nomkstream_) {
+      *output = Redis::NilString();
+      return Status::OK();
+    }
+
+    *output = Redis::BulkString(entry_id.ToString());
+
+    svr->OnEntryAddedToStream(conn->GetNamespace(), stream_name_, entry_id);
+
+    return Status::OK();
+  }
+
+ private:
+  std::string stream_name_;
+  uint64_t max_len_ = 0;
+  Redis::StreamEntryID min_id_;
+  Redis::NewStreamEntryID entry_id_;
+  std::vector<std::string> name_value_pairs_;
+  bool nomkstream_ = false;
+  bool with_max_len_ = false;
+  bool with_min_id_ = false;
+  bool with_entry_id_ = false;
+};
+
+class CommandXDel : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    for (size_t i = 2; i < args.size(); ++i) {
+      Redis::StreamEntryID id;
+      auto s = ParseStreamEntryID(args[i], &id);
+      if (!s.IsOK()) {
+        return s;
+      }
+      ids_.push_back(id);
+    }
+    return Status::OK();
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    if (svr->IsSlave()) {
+      return Status(Status::NotOK, "READONLY You can't write against a read only slave");
+    }

Review Comment:
   can remove as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org