You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by hu...@apache.org on 2022/09/20 16:14:18 UTC
[incubator-kvrocks] branch unstable updated: Add the hrange command (#895)
This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 62daa73 Add the hrange command (#895)
62daa73 is described below
commit 62daa73f03fb9675e5cdbcb578f18017d6196864
Author: Ruixiang Tan <81...@qq.com>
AuthorDate: Wed Sep 21 00:14:11 2022 +0800
Add the hrange command (#895)
---
src/redis_cmd.cc | 55 ++++++++++++++++---
src/redis_hash.cc | 34 ++++++++++++
src/redis_hash.h | 2 +
tests/cppunit/t_hash_test.cc | 33 ++++++++++++
tests/gocase/unit/command/command_test.go | 4 +-
tests/gocase/unit/type/hash/hash_test.go | 87 +++++++++++++++++++++++++++++++
6 files changed, 206 insertions(+), 9 deletions(-)
diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc
index 8943dd0..89fb81f 100644
--- a/src/redis_cmd.cc
+++ b/src/redis_cmd.cc
@@ -23,8 +23,10 @@
#include <sys/socket.h>
#include <algorithm>
#include <cctype>
+#include <climits>
#include <cmath>
#include <chrono>
+#include <vector>
#include <thread>
#include <utility>
#include <memory>
@@ -1427,10 +1429,11 @@ class CommandHVals : public Commander {
if (!s.ok()) {
return Status(Status::RedisExecErr, s.ToString());
}
- *output = "*" + std::to_string(field_values.size()) + CRLF;
- for (const auto &fv : field_values) {
- *output += Redis::BulkString(fv.value);
+ std::vector<std::string> values;
+ for (const auto &p : field_values) {
+ values.emplace_back(p.value);
}
+ *output = MultiBulkString(values);
return Status::OK();
}
};
@@ -1444,13 +1447,50 @@ class CommandHGetAll : public Commander {
if (!s.ok()) {
return Status(Status::RedisExecErr, s.ToString());
}
- *output = "*" + std::to_string(field_values.size() * 2) + CRLF;
- for (const auto &fv : field_values) {
- *output += Redis::BulkString(fv.field);
- *output += Redis::BulkString(fv.value);
+ std::vector<std::string> kv_pairs;
+ for (const auto &p : field_values) {
+ kv_pairs.emplace_back(p.field);
+ kv_pairs.emplace_back(p.value);
+ }
+ *output = MultiBulkString(kv_pairs);
+ return Status::OK();
+ }
+};
+
+class CommandHRange : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() != 6 && args.size() != 4) {
+ return Status(Status::RedisParseErr, errWrongNumOfArguments);
+ }
+ if (args.size() == 6 && Util::ToLower(args[4]) != "limit") {
+ return Status(Status::RedisInvalidCmd, errInvalidSyntax);
}
+ if (args.size() == 6) {
+ auto parse_result = ParseInt<int64_t>(args_[5], 10);
+ if (!parse_result)return Status(Status::RedisParseErr, errValueNotInterger);
+ limit_ = *parse_result;
+ }
+ return Commander::Parse(args);
+ }
+ Status Execute(Server *svr, Connection *conn, std::string *output) override {
+ Redis::Hash hash_db(svr->storage_, conn->GetNamespace());
+ std::vector<FieldValue> field_values;
+ rocksdb::Status s = hash_db.Range(args_[1], args_[2], args_[3], limit_, &field_values);
+ if (!s.ok()) {
+ return Status(Status::RedisExecErr, s.ToString());
+ }
+ std::vector<std::string> kv_pairs;
+ for (const auto &p : field_values) {
+ kv_pairs.emplace_back(p.field);
+ kv_pairs.emplace_back(p.value);
+ }
+ *output = MultiBulkString(kv_pairs);
return Status::OK();
}
+
+ private:
+ int64_t limit_ = LONG_MAX;
};
class CommandPush : public Commander {
@@ -5823,6 +5863,7 @@ CommandAttributes redisCommandTable[] = {
ADD_CMD("hvals", 2, "read-only", 1, 1, 1, CommandHVals),
ADD_CMD("hgetall", 2, "read-only", 1, 1, 1, CommandHGetAll),
ADD_CMD("hscan", -3, "read-only", 1, 1, 1, CommandHScan),
+ ADD_CMD("hrange", -4, "read-only", 1, 1, 1, CommandHRange),
ADD_CMD("lpush", -3, "write", 1, 1, 1, CommandLPush),
ADD_CMD("rpush", -3, "write", 1, 1, 1, CommandRPush),
diff --git a/src/redis_hash.cc b/src/redis_hash.cc
index 36c1597..078aa2a 100644
--- a/src/redis_hash.cc
+++ b/src/redis_hash.cc
@@ -20,6 +20,7 @@
#include "redis_hash.h"
#include <utility>
+#include <algorithm>
#include <limits>
#include <cmath>
#include <iostream>
@@ -274,6 +275,39 @@ rocksdb::Status Hash::MSet(const Slice &user_key, const std::vector<FieldValue>
return storage_->Write(rocksdb::WriteOptions(), &batch);
}
+rocksdb::Status Hash::Range(const Slice &user_key, const Slice &start, const Slice &stop,
+ int64_t limit, std::vector<FieldValue> *field_values) {
+ field_values->clear();
+ if (start.compare(stop) >= 0 || limit <= 0) {
+ return rocksdb::Status::OK();
+ }
+ std::string ns_key;
+ AppendNamespacePrefix(user_key, &ns_key);
+ HashMetadata metadata(false);
+ rocksdb::Status s = GetMetadata(ns_key, &metadata);
+ limit = std::min(static_cast<int64_t>(metadata.size), limit);
+ std::string start_key, stop_key;
+ InternalKey(ns_key, start, metadata.version, storage_->IsSlotIdEncoded()).Encode(&start_key);
+ InternalKey(ns_key, stop, metadata.version, storage_->IsSlotIdEncoded()).Encode(&stop_key);
+ rocksdb::ReadOptions read_options;
+ LatestSnapShot ss(db_);
+ read_options.snapshot = ss.GetSnapShot();
+ rocksdb::Slice upper_bound(stop_key);
+ read_options.iterate_upper_bound = &upper_bound;
+ read_options.fill_cache = false;
+
+ auto iter = DBUtil::UniqueIterator(db_, read_options);
+ iter->Seek(start_key);
+ for (int i = 0; iter->Valid() && i <= limit - 1; i++) {
+ FieldValue tmp_field_value;
+ InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
+ tmp_field_value.field = ikey.GetSubKey().ToString();
+ tmp_field_value.value = iter->value().ToString();
+ field_values->emplace_back(tmp_field_value);
+ iter->Next();
+ }
+ return rocksdb::Status::OK();
+}
rocksdb::Status Hash::GetAll(const Slice &user_key, std::vector<FieldValue> *field_values, HashFetchType type) {
field_values->clear();
diff --git a/src/redis_hash.h b/src/redis_hash.h
index bd58e42..9667f78 100644
--- a/src/redis_hash.h
+++ b/src/redis_hash.h
@@ -52,6 +52,8 @@ class Hash : public SubKeyScanner {
rocksdb::Status IncrBy(const Slice &user_key, const Slice &field, int64_t increment, int64_t *ret);
rocksdb::Status IncrByFloat(const Slice &user_key, const Slice &field, double increment, double *ret);
rocksdb::Status MSet(const Slice &user_key, const std::vector<FieldValue> &field_values, bool nx, int *ret);
+ rocksdb::Status Range(const Slice &user_key, const Slice &start, const Slice& stop,
+ int64_t limit, std::vector<FieldValue> *field_values);
rocksdb::Status MGet(const Slice &user_key,
const std::vector<Slice> &fields,
std::vector<std::string> *values,
diff --git a/tests/cppunit/t_hash_test.cc b/tests/cppunit/t_hash_test.cc
index f1153c6..27f612b 100644
--- a/tests/cppunit/t_hash_test.cc
+++ b/tests/cppunit/t_hash_test.cc
@@ -20,6 +20,9 @@
#include <gtest/gtest.h>
#include <memory>
+#include <string>
+#include <random>
+#include <algorithm>
#include "test_base.h"
#include "redis_hash.h"
@@ -153,4 +156,34 @@ TEST_F(RedisHashTest, HIncrByFloat) {
value = std::stof(bytes);
EXPECT_FLOAT_EQ(32*1.2, value);
hash->Del(key_);
+}
+
+TEST_F(RedisHashTest, HRange) {
+ int ret;
+ std::vector<FieldValue> fvs;
+ for (size_t i = 0; i < 4; i++) {
+ fvs.emplace_back(FieldValue{"key" + std::to_string(i), "value" + std::to_string(i)});
+ }
+ for (size_t i = 0; i < 26; i++) {
+ fvs.emplace_back(FieldValue{std::to_string(char(i + 'a')), std::to_string(char(i + 'a'))});
+ }
+
+ std::random_device rd;
+ std::mt19937 g(rd());
+ std::vector<FieldValue> tmp(fvs);
+ for (size_t i =0; i < 100 ; i ++) {
+ std::shuffle(tmp.begin(), tmp.end(), g);
+ rocksdb::Status s = hash->MSet(key_, tmp, false, &ret);
+ EXPECT_TRUE(s.ok() && static_cast<int>(tmp.size()) == ret);
+ s = hash->MSet(key_, fvs, false, &ret);
+ EXPECT_EQ(ret ,0);
+ std::vector<FieldValue> result;
+ s = hash->Range(key_, "key0", "key4", INT_MAX, &result);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(4, result.size());
+ EXPECT_EQ("key0", result[0].field);
+ EXPECT_EQ("key1", result[1].field);
+ EXPECT_EQ("key2", result[2].field);
+ hash->Del(key_);
+ }
}
\ No newline at end of file
diff --git a/tests/gocase/unit/command/command_test.go b/tests/gocase/unit/command/command_test.go
index 46c4e4c..b5a87fc 100644
--- a/tests/gocase/unit/command/command_test.go
+++ b/tests/gocase/unit/command/command_test.go
@@ -35,11 +35,11 @@ func TestCommand(t *testing.T) {
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()
- t.Run("Kvrocks supports 181 commands currently", func(t *testing.T) {
+ t.Run("Kvrocks supports 182 commands currently", func(t *testing.T) {
r := rdb.Do(ctx, "COMMAND", "COUNT")
v, err := r.Int()
require.NoError(t, err)
- require.Equal(t, 181, v)
+ require.Equal(t, 182, v)
})
t.Run("acquire GET command info by COMMAND INFO", func(t *testing.T) {
diff --git a/tests/gocase/unit/type/hash/hash_test.go b/tests/gocase/unit/type/hash/hash_test.go
new file mode 100644
index 0000000..349f88b
--- /dev/null
+++ b/tests/gocase/unit/type/hash/hash_test.go
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package hash
+
+import (
+ "context"
+ "testing"
+
+ "github.com/apache/incubator-kvrocks/tests/gocase/util"
+ "github.com/stretchr/testify/require"
+)
+
+func TestHash(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{})
+ defer srv.Close()
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+ kvArray := []string{"a", "a", "b", "b", "c", "c", "d", "d", "e", "e", "key1", "value1", "key2", "value2", "key3", "value3", "key10", "value10", "z", "z", "x", "x"}
+ t.Run("HRange normal situation ", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+ require.NoError(t, rdb.HMSet(ctx, "hashkey", kvArray).Err())
+ require.EqualValues(t, []interface{}{"key1", "value1", "key10", "value10"}, rdb.Do(ctx, "HRange", "hashkey", "key1", "key2", "limit", 100).Val())
+ require.EqualValues(t, []interface{}{"key1", "value1", "key10", "value10", "key2", "value2"}, rdb.Do(ctx, "HRange", "hashkey", "key1", "key3", "limit", 100).Val())
+ })
+
+ t.Run("HRange stop <= start", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+ require.NoError(t, rdb.HMSet(ctx, "hashkey", kvArray).Err())
+ require.EqualValues(t, []interface{}{}, rdb.Do(ctx, "HRange", "hashkey", "key2", "key1", "limit", 100).Val())
+ require.EqualValues(t, []interface{}{}, rdb.Do(ctx, "HRange", "hashkey", "key1", "key1", "limit", 100).Val())
+ })
+
+ t.Run("HRange limit", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+ require.NoError(t, rdb.HMSet(ctx, "hashkey", kvArray).Err())
+ require.EqualValues(t, []interface{}{"a", "a", "b", "b"}, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit", 2).Val())
+ require.EqualValues(t, []interface{}{"a", "a", "b", "b", "c", "c", "d", "d", "e", "e", "key1", "value1", "key10", "value10", "key2", "value2", "key3", "value3", "x", "x", "z", "z"}, rdb.Do(ctx, "HRange", "hashkey", "a", "zzz", "limit", 10000).Val())
+ })
+
+ t.Run("HRange limit is negative", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+ require.NoError(t, rdb.HMSet(ctx, "hashkey", kvArray).Err())
+ require.EqualValues(t, []interface{}{}, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit", -100).Val())
+ require.EqualValues(t, []interface{}{}, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit", 0).Val())
+ })
+
+ t.Run("HRange nonexistent key", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+ require.EqualValues(t, []interface{}{}, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit", 10000).Val())
+ require.EqualValues(t, []interface{}{}, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit", 10000).Val())
+ })
+
+ t.Run("HRange limit typo", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+ require.NoError(t, rdb.HMSet(ctx, "hashkey", kvArray).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limitzz", 10000).Err(), "ERR syntax")
+ })
+
+ t.Run("HRange wrong number of arguments", func(t *testing.T) {
+ require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+ require.NoError(t, rdb.HMSet(ctx, "hashkey", kvArray).Err())
+ require.ErrorContains(t, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit", 10000, "a").Err(), "wrong number of arguments")
+ require.ErrorContains(t, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit").Err(), "wrong number of arguments")
+ require.ErrorContains(t, rdb.Do(ctx, "HRange", "hashkey", "a").Err(), "wrong number of arguments")
+ require.ErrorContains(t, rdb.Do(ctx, "HRange", "hashkey").Err(), "wrong number of arguments")
+ require.ErrorContains(t, rdb.Do(ctx, "HRange").Err(), "wrong number of arguments")
+ })
+
+}