You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kvrocks.apache.org by hu...@apache.org on 2022/09/20 16:14:18 UTC

[incubator-kvrocks] branch unstable updated: Add the hrange command (#895)

This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/incubator-kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 62daa73  Add the hrange command (#895)
62daa73 is described below

commit 62daa73f03fb9675e5cdbcb578f18017d6196864
Author: Ruixiang Tan <81...@qq.com>
AuthorDate: Wed Sep 21 00:14:11 2022 +0800

    Add the hrange command (#895)
---
 src/redis_cmd.cc                          | 55 ++++++++++++++++---
 src/redis_hash.cc                         | 34 ++++++++++++
 src/redis_hash.h                          |  2 +
 tests/cppunit/t_hash_test.cc              | 33 ++++++++++++
 tests/gocase/unit/command/command_test.go |  4 +-
 tests/gocase/unit/type/hash/hash_test.go  | 87 +++++++++++++++++++++++++++++++
 6 files changed, 206 insertions(+), 9 deletions(-)

diff --git a/src/redis_cmd.cc b/src/redis_cmd.cc
index 8943dd0..89fb81f 100644
--- a/src/redis_cmd.cc
+++ b/src/redis_cmd.cc
@@ -23,8 +23,10 @@
 #include <sys/socket.h>
 #include <algorithm>
 #include <cctype>
+#include <climits>
 #include <cmath>
 #include <chrono>
+#include <vector>
 #include <thread>
 #include <utility>
 #include <memory>
@@ -1427,10 +1429,11 @@ class CommandHVals : public Commander {
     if (!s.ok()) {
       return Status(Status::RedisExecErr, s.ToString());
     }
-    *output = "*" + std::to_string(field_values.size()) + CRLF;
-    for (const auto &fv : field_values) {
-      *output += Redis::BulkString(fv.value);
+    std::vector<std::string> values;
+    for (const auto &p : field_values) {
+      values.emplace_back(p.value);
     }
+    *output = MultiBulkString(values);
     return Status::OK();
   }
 };
@@ -1444,13 +1447,50 @@ class CommandHGetAll : public Commander {
     if (!s.ok()) {
       return Status(Status::RedisExecErr, s.ToString());
     }
-    *output = "*" + std::to_string(field_values.size() * 2) + CRLF;
-    for (const auto &fv : field_values) {
-      *output += Redis::BulkString(fv.field);
-      *output += Redis::BulkString(fv.value);
+    std::vector<std::string> kv_pairs;
+    for (const auto &p : field_values) {
+      kv_pairs.emplace_back(p.field);
+      kv_pairs.emplace_back(p.value);
+    }
+    *output = MultiBulkString(kv_pairs);
+    return Status::OK();
+  }
+};
+
+class CommandHRange : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    if (args.size() != 6 && args.size() != 4) {
+      return Status(Status::RedisParseErr, errWrongNumOfArguments);
+    }
+    if (args.size() == 6 && Util::ToLower(args[4]) != "limit") {
+      return Status(Status::RedisInvalidCmd, errInvalidSyntax);
     }
+    if (args.size() == 6) {
+      auto parse_result = ParseInt<int64_t>(args_[5], 10);
+      if (!parse_result)return Status(Status::RedisParseErr, errValueNotInterger);
+      limit_ = *parse_result;
+    }
+    return Commander::Parse(args);
+  }
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    Redis::Hash hash_db(svr->storage_, conn->GetNamespace());
+    std::vector<FieldValue> field_values;
+    rocksdb::Status s = hash_db.Range(args_[1], args_[2], args_[3], limit_, &field_values);
+    if (!s.ok()) {
+      return Status(Status::RedisExecErr, s.ToString());
+    }
+    std::vector<std::string> kv_pairs;
+    for (const auto &p : field_values) {
+      kv_pairs.emplace_back(p.field);
+      kv_pairs.emplace_back(p.value);
+    }
+    *output = MultiBulkString(kv_pairs);
     return Status::OK();
   }
+
+ private:
+  int64_t limit_ = LONG_MAX;
 };
 
 class CommandPush : public Commander {
@@ -5823,6 +5863,7 @@ CommandAttributes redisCommandTable[] = {
     ADD_CMD("hvals", 2, "read-only", 1, 1, 1, CommandHVals),
     ADD_CMD("hgetall", 2, "read-only", 1, 1, 1, CommandHGetAll),
     ADD_CMD("hscan", -3, "read-only", 1, 1, 1, CommandHScan),
+    ADD_CMD("hrange", -4, "read-only", 1, 1, 1, CommandHRange),
 
     ADD_CMD("lpush", -3, "write", 1, 1, 1, CommandLPush),
     ADD_CMD("rpush", -3, "write", 1, 1, 1, CommandRPush),
diff --git a/src/redis_hash.cc b/src/redis_hash.cc
index 36c1597..078aa2a 100644
--- a/src/redis_hash.cc
+++ b/src/redis_hash.cc
@@ -20,6 +20,7 @@
 
 #include "redis_hash.h"
 #include <utility>
+#include <algorithm>
 #include <limits>
 #include <cmath>
 #include <iostream>
@@ -274,6 +275,39 @@ rocksdb::Status Hash::MSet(const Slice &user_key, const std::vector<FieldValue>
   return storage_->Write(rocksdb::WriteOptions(), &batch);
 }
 
+rocksdb::Status Hash::Range(const Slice &user_key, const Slice &start, const Slice &stop,
+                            int64_t limit, std::vector<FieldValue> *field_values) {
+  field_values->clear();
+  if (start.compare(stop) >= 0 || limit <= 0) {
+    return rocksdb::Status::OK();
+  }
+  std::string ns_key;
+  AppendNamespacePrefix(user_key, &ns_key);
+  HashMetadata metadata(false);
+  rocksdb::Status s = GetMetadata(ns_key, &metadata);
+  limit = std::min(static_cast<int64_t>(metadata.size), limit);
+  std::string start_key, stop_key;
+  InternalKey(ns_key, start, metadata.version, storage_->IsSlotIdEncoded()).Encode(&start_key);
+  InternalKey(ns_key, stop, metadata.version, storage_->IsSlotIdEncoded()).Encode(&stop_key);
+  rocksdb::ReadOptions read_options;
+  LatestSnapShot ss(db_);
+  read_options.snapshot = ss.GetSnapShot();
+  rocksdb::Slice upper_bound(stop_key);
+  read_options.iterate_upper_bound = &upper_bound;
+  read_options.fill_cache = false;
+
+  auto iter = DBUtil::UniqueIterator(db_, read_options);
+  iter->Seek(start_key);
+  for (int i = 0; iter->Valid() && i <= limit - 1; i++) {
+    FieldValue tmp_field_value;
+    InternalKey ikey(iter->key(), storage_->IsSlotIdEncoded());
+    tmp_field_value.field = ikey.GetSubKey().ToString();
+    tmp_field_value.value = iter->value().ToString();
+    field_values->emplace_back(tmp_field_value);
+    iter->Next();
+  }
+  return rocksdb::Status::OK();
+}
 rocksdb::Status Hash::GetAll(const Slice &user_key, std::vector<FieldValue> *field_values, HashFetchType type) {
   field_values->clear();
 
diff --git a/src/redis_hash.h b/src/redis_hash.h
index bd58e42..9667f78 100644
--- a/src/redis_hash.h
+++ b/src/redis_hash.h
@@ -52,6 +52,8 @@ class Hash : public SubKeyScanner {
   rocksdb::Status IncrBy(const Slice &user_key, const Slice &field, int64_t increment, int64_t *ret);
   rocksdb::Status IncrByFloat(const Slice &user_key, const Slice &field, double increment, double *ret);
   rocksdb::Status MSet(const Slice &user_key, const std::vector<FieldValue> &field_values, bool nx, int *ret);
+  rocksdb::Status Range(const Slice &user_key, const Slice &start, const Slice& stop,
+                        int64_t limit, std::vector<FieldValue> *field_values);
   rocksdb::Status MGet(const Slice &user_key,
                        const std::vector<Slice> &fields,
                        std::vector<std::string> *values,
diff --git a/tests/cppunit/t_hash_test.cc b/tests/cppunit/t_hash_test.cc
index f1153c6..27f612b 100644
--- a/tests/cppunit/t_hash_test.cc
+++ b/tests/cppunit/t_hash_test.cc
@@ -20,6 +20,9 @@
 
 #include <gtest/gtest.h>
 #include <memory>
+#include <string>
+#include <random>
+#include <algorithm>
 
 #include "test_base.h"
 #include "redis_hash.h"
@@ -153,4 +156,34 @@ TEST_F(RedisHashTest, HIncrByFloat) {
   value = std::stof(bytes);
   EXPECT_FLOAT_EQ(32*1.2, value);
   hash->Del(key_);
+}
+
+TEST_F(RedisHashTest, HRange) {
+  int ret;
+  std::vector<FieldValue> fvs;
+  for (size_t i = 0; i < 4; i++) {
+    fvs.emplace_back(FieldValue{"key" + std::to_string(i), "value" + std::to_string(i)});
+  }
+  for (size_t i = 0; i < 26; i++) {
+    fvs.emplace_back(FieldValue{std::to_string(char(i + 'a')), std::to_string(char(i + 'a'))});
+  }
+
+  std::random_device rd;
+  std::mt19937 g(rd());
+  std::vector<FieldValue> tmp(fvs);
+  for (size_t i =0; i < 100 ; i ++) {
+    std::shuffle(tmp.begin(), tmp.end(), g);
+    rocksdb::Status s = hash->MSet(key_, tmp, false, &ret);
+    EXPECT_TRUE(s.ok() && static_cast<int>(tmp.size()) == ret);
+    s = hash->MSet(key_, fvs, false, &ret);
+    EXPECT_EQ(ret ,0);
+    std::vector<FieldValue> result;
+    s = hash->Range(key_, "key0", "key4", INT_MAX, &result);
+    EXPECT_TRUE(s.ok());
+    EXPECT_EQ(4, result.size());
+    EXPECT_EQ("key0", result[0].field);
+    EXPECT_EQ("key1", result[1].field);
+    EXPECT_EQ("key2", result[2].field);
+    hash->Del(key_);
+  }
 }
\ No newline at end of file
diff --git a/tests/gocase/unit/command/command_test.go b/tests/gocase/unit/command/command_test.go
index 46c4e4c..b5a87fc 100644
--- a/tests/gocase/unit/command/command_test.go
+++ b/tests/gocase/unit/command/command_test.go
@@ -35,11 +35,11 @@ func TestCommand(t *testing.T) {
 	rdb := srv.NewClient()
 	defer func() { require.NoError(t, rdb.Close()) }()
 
-	t.Run("Kvrocks supports 181 commands currently", func(t *testing.T) {
+	t.Run("Kvrocks supports 182 commands currently", func(t *testing.T) {
 		r := rdb.Do(ctx, "COMMAND", "COUNT")
 		v, err := r.Int()
 		require.NoError(t, err)
-		require.Equal(t, 181, v)
+		require.Equal(t, 182, v)
 	})
 
 	t.Run("acquire GET command info by COMMAND INFO", func(t *testing.T) {
diff --git a/tests/gocase/unit/type/hash/hash_test.go b/tests/gocase/unit/type/hash/hash_test.go
new file mode 100644
index 0000000..349f88b
--- /dev/null
+++ b/tests/gocase/unit/type/hash/hash_test.go
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package hash
+
+import (
+	"context"
+	"testing"
+
+	"github.com/apache/incubator-kvrocks/tests/gocase/util"
+	"github.com/stretchr/testify/require"
+)
+
+func TestHash(t *testing.T) {
+	srv := util.StartServer(t, map[string]string{})
+	defer srv.Close()
+	ctx := context.Background()
+	rdb := srv.NewClient()
+	defer func() { require.NoError(t, rdb.Close()) }()
+	kvArray := []string{"a", "a", "b", "b", "c", "c", "d", "d", "e", "e", "key1", "value1", "key2", "value2", "key3", "value3", "key10", "value10", "z", "z", "x", "x"}
+	t.Run("HRange normal situation ", func(t *testing.T) {
+		require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+		require.NoError(t, rdb.HMSet(ctx, "hashkey", kvArray).Err())
+		require.EqualValues(t, []interface{}{"key1", "value1", "key10", "value10"}, rdb.Do(ctx, "HRange", "hashkey", "key1", "key2", "limit", 100).Val())
+		require.EqualValues(t, []interface{}{"key1", "value1", "key10", "value10", "key2", "value2"}, rdb.Do(ctx, "HRange", "hashkey", "key1", "key3", "limit", 100).Val())
+	})
+
+	t.Run("HRange stop <= start", func(t *testing.T) {
+		require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+		require.NoError(t, rdb.HMSet(ctx, "hashkey", kvArray).Err())
+		require.EqualValues(t, []interface{}{}, rdb.Do(ctx, "HRange", "hashkey", "key2", "key1", "limit", 100).Val())
+		require.EqualValues(t, []interface{}{}, rdb.Do(ctx, "HRange", "hashkey", "key1", "key1", "limit", 100).Val())
+	})
+
+	t.Run("HRange limit", func(t *testing.T) {
+		require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+		require.NoError(t, rdb.HMSet(ctx, "hashkey", kvArray).Err())
+		require.EqualValues(t, []interface{}{"a", "a", "b", "b"}, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit", 2).Val())
+		require.EqualValues(t, []interface{}{"a", "a", "b", "b", "c", "c", "d", "d", "e", "e", "key1", "value1", "key10", "value10", "key2", "value2", "key3", "value3", "x", "x", "z", "z"}, rdb.Do(ctx, "HRange", "hashkey", "a", "zzz", "limit", 10000).Val())
+	})
+
+	t.Run("HRange limit is negative", func(t *testing.T) {
+		require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+		require.NoError(t, rdb.HMSet(ctx, "hashkey", kvArray).Err())
+		require.EqualValues(t, []interface{}{}, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit", -100).Val())
+		require.EqualValues(t, []interface{}{}, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit", 0).Val())
+	})
+
+	t.Run("HRange nonexistent key", func(t *testing.T) {
+		require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+		require.EqualValues(t, []interface{}{}, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit", 10000).Val())
+		require.EqualValues(t, []interface{}{}, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit", 10000).Val())
+	})
+
+	t.Run("HRange limit typo", func(t *testing.T) {
+		require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+		require.NoError(t, rdb.HMSet(ctx, "hashkey", kvArray).Err())
+		require.ErrorContains(t, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limitzz", 10000).Err(), "ERR syntax")
+	})
+
+	t.Run("HRange wrong number of arguments", func(t *testing.T) {
+		require.NoError(t, rdb.Del(ctx, "hashkey").Err())
+		require.NoError(t, rdb.HMSet(ctx, "hashkey", kvArray).Err())
+		require.ErrorContains(t, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit", 10000, "a").Err(), "wrong number of arguments")
+		require.ErrorContains(t, rdb.Do(ctx, "HRange", "hashkey", "a", "z", "limit").Err(), "wrong number of arguments")
+		require.ErrorContains(t, rdb.Do(ctx, "HRange", "hashkey", "a").Err(), "wrong number of arguments")
+		require.ErrorContains(t, rdb.Do(ctx, "HRange", "hashkey").Err(), "wrong number of arguments")
+		require.ErrorContains(t, rdb.Do(ctx, "HRange").Err(), "wrong number of arguments")
+	})
+
+}