You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kvrocks.apache.org by "IoCing (via GitHub)" <gi...@apache.org> on 2023/06/24 10:26:46 UTC

[GitHub] [kvrocks] IoCing opened a new pull request, #1517: Zinter and zintercard

IoCing opened a new pull request, #1517:
URL: https://github.com/apache/kvrocks/pull/1517

   #1457 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] torwig commented on a diff in pull request #1517: Add support of new command: ZINTER / ZINTERCARD

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#discussion_r1242687148


##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,104 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+
+  for (const auto &iter : dst_zset) {
+    if (member_counters[iter.first] != keys_weights.size()) continue;
+    mscores->emplace_back(MemberScore{iter.first, iter.second});
+  }
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status ZSet::InterCard(const std::vector<KeyWeight> &keys_weights, uint64_t limit, uint64_t *saved_cnt) {
+  if (saved_cnt) *saved_cnt = 0;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+    // Judging whether this cycle can find the intersection

Review Comment:
   @IoCing Now you can safely remove this comment and another one at line 792.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] IoCing commented on a diff in pull request #1517: Add support of new command: ZINTER / ZINTERCARD

Posted by "IoCing (via GitHub)" <gi...@apache.org>.
IoCing commented on code in PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#discussion_r1242432894


##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,109 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+
+  for (const auto &iter : dst_zset) {
+    if (member_counters[iter.first] != keys_weights.size()) continue;
+    mscores->emplace_back(MemberScore{iter.first, iter.second});
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status ZSet::InterCard(const std::vector<KeyWeight> &keys_weights, uint64_t limit, uint64_t *saved_cnt) {
+  if (saved_cnt) *saved_cnt = 0;
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+    // Judging whether this cycle can find the intersection
+    bool flag = false;
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;

Review Comment:
   thanks,I have modified the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] infdahai commented on a diff in pull request #1517: Add support of new command: ZINTER / ZINTERCARD

Posted by "infdahai (via GitHub)" <gi...@apache.org>.
infdahai commented on code in PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#discussion_r1243219359


##########
src/commands/cmd_zset.cc:
##########
@@ -1340,6 +1469,8 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandZAdd>("zadd", -4, "write", 1, 1, 1),
                         MakeCmdAttr<CommandZCount>("zcount", 4, "read-only", 1, 1, 1),
                         MakeCmdAttr<CommandZIncrBy>("zincrby", 4, "write", 1, 1, 1),
                         MakeCmdAttr<CommandZInterStore>("zinterstore", -4, "write", 1, 1, 1),
+                        MakeCmdAttr<CommandZInter>("zinter", -3, "read-only", 1, 1, 1),

Review Comment:
   It seems the keyrange should be processed with `numkeys_`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] torwig commented on a diff in pull request #1517: Add support of new command: ZINTER / ZINTERCARD

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#discussion_r1241940467


##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,112 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  if (!dst_zset.empty()) {
+    for (const auto &iter : dst_zset) {
+      if (member_counters[iter.first] != keys_weights.size()) continue;
+      mscores->emplace_back(MemberScore{iter.first, iter.second});
+    }
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status ZSet::InterCard(const std::vector<KeyWeight> &keys_weights, uint64_t limit, uint64_t *saved_cnt) {
+  if (saved_cnt) *saved_cnt = 0;
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+    bool flag = false;

Review Comment:
   Give a more descriptive name to a boolean variable instead of `flag`.



##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,112 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  if (!dst_zset.empty()) {

Review Comment:
   You can omit to check the `!mscores->empty()` condition here because later you use range for-loop.



##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,112 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  if (!dst_zset.empty()) {
+    for (const auto &iter : dst_zset) {
+      if (member_counters[iter.first] != keys_weights.size()) continue;
+      mscores->emplace_back(MemberScore{iter.first, iter.second});
+    }
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status ZSet::InterCard(const std::vector<KeyWeight> &keys_weights, uint64_t limit, uint64_t *saved_cnt) {
+  if (saved_cnt) *saved_cnt = 0;
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+    bool flag = false;
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      if (member_counters[ms.member] == i + 1) {
+        flag = true;
+      }
+    }
+    if (!flag) {
+      *saved_cnt = 0;
+      return rocksdb::Status::OK();
+    }
+  }
+  uint64_t count = 0;
+  if (!dst_zset.empty()) {

Review Comment:
   Same as in the previous function.



##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,112 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  if (!dst_zset.empty()) {
+    for (const auto &iter : dst_zset) {
+      if (member_counters[iter.first] != keys_weights.size()) continue;
+      mscores->emplace_back(MemberScore{iter.first, iter.second});
+    }
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status ZSet::InterCard(const std::vector<KeyWeight> &keys_weights, uint64_t limit, uint64_t *saved_cnt) {
+  if (saved_cnt) *saved_cnt = 0;
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+    bool flag = false;
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      if (member_counters[ms.member] == i + 1) {
+        flag = true;
+      }
+    }
+    if (!flag) {
+      *saved_cnt = 0;
+      return rocksdb::Status::OK();
+    }
+  }
+  uint64_t count = 0;
+  if (!dst_zset.empty()) {
+    for (const auto &iter : dst_zset) {
+      if (member_counters[iter.first] != keys_weights.size()) continue;
+      count++;
+      LOG(INFO) << "limit : " << limit << " count : " << count;

Review Comment:
   I guess you added this line just for debugging purposes. Now you can remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add support of new command: ZINTER / ZINTERCARD [kvrocks]

Posted by "git-hulk (via GitHub)" <gi...@apache.org>.
git-hulk closed pull request #1517: Add support of new command: ZINTER / ZINTERCARD
URL: https://github.com/apache/kvrocks/pull/1517


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] torwig commented on a diff in pull request #1517: Add support of new command: ZINTER / ZINTERCARD

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#discussion_r1242224931


##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,109 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+
+  for (const auto &iter : dst_zset) {
+    if (member_counters[iter.first] != keys_weights.size()) continue;
+    mscores->emplace_back(MemberScore{iter.first, iter.second});
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status ZSet::InterCard(const std::vector<KeyWeight> &keys_weights, uint64_t limit, uint64_t *saved_cnt) {
+  if (saved_cnt) *saved_cnt = 0;
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+    // Judging whether this cycle can find the intersection
+    bool flag = false;
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;

Review Comment:
   I'd recommend you to have a look at the implementation of `SINTERCARD` here https://github.com/apache/kvrocks/blob/unstable/src/types/redis_set.cc#L390 . Specifically, I want to stress using an iterator and then incrementing the value of the counter using the iterator.



##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,109 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {

Review Comment:
   Just curiosity: in what circumstances we possibly can get `NaN` here?



##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,112 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  if (!dst_zset.empty()) {
+    for (const auto &iter : dst_zset) {
+      if (member_counters[iter.first] != keys_weights.size()) continue;
+      mscores->emplace_back(MemberScore{iter.first, iter.second});
+    }
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status ZSet::InterCard(const std::vector<KeyWeight> &keys_weights, uint64_t limit, uint64_t *saved_cnt) {
+  if (saved_cnt) *saved_cnt = 0;
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+    bool flag = false;

Review Comment:
   @IoCing Not via **comment** but use a proper **name** which will describe the **purpose** of the variable, what the variable **does**. For example, `have_intersection`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Add support of new command: ZINTER / ZINTERCARD [kvrocks]

Posted by "git-hulk (via GitHub)" <gi...@apache.org>.
git-hulk commented on PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#issuecomment-1886177121

   Has been done in #1983, close this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] IoCing commented on a diff in pull request #1517: Add support of new command: ZINTER / ZINTERCARD

Posted by "IoCing (via GitHub)" <gi...@apache.org>.
IoCing commented on code in PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#discussion_r1241494978


##########
src/commands/cmd_zset.cc:
##########
@@ -1311,6 +1311,133 @@ class CommandZInterStore : public CommandZUnionStore {
   }
 };
 
+class CommandZInter : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    auto parse_result = ParseInt<int>(args[1], 10);
+    if (!parse_result) {
+      return {Status::RedisParseErr, errValueNotInteger};
+    }
+
+    numkeys_ = *parse_result;
+    if (numkeys_ > args.size() - 2) {
+      return {Status::RedisParseErr, errInvalidSyntax};
+    }
+
+    size_t j = 0;
+    while (j < numkeys_) {
+      keys_weights_.emplace_back(KeyWeight{args[j + 2], 1});
+      j++;
+    }
+
+    size_t i = 2 + numkeys_;
+    while (i < args.size()) {
+      if (util::ToLower(args[i]) == "aggregate" && i + 1 < args.size()) {
+        if (util::ToLower(args[i + 1]) == "sum") {
+          aggregate_method_ = kAggregateSum;
+        } else if (util::ToLower(args[i + 1]) == "min") {
+          aggregate_method_ = kAggregateMin;
+        } else if (util::ToLower(args[i + 1]) == "max") {
+          aggregate_method_ = kAggregateMax;
+        } else {
+          return {Status::RedisParseErr, "aggregate param error"};
+        }
+        i += 2;
+      } else if (util::ToLower(args[i]) == "weights" && i + numkeys_ < args.size()) {
+        size_t k = 0;
+        while (k < numkeys_) {
+          auto weight = ParseFloat(args[i + k + 1]);
+          if (!weight || std::isnan(*weight)) {
+            return {Status::RedisParseErr, "weight is not a double or out of range"};
+          }
+          keys_weights_[k].weight = *weight;
+          k++;
+        }
+        i += numkeys_ + 1;
+      } else if (util::ToLower(args[i]) == "withscores") {
+        with_scores_ = true;
+        i++;
+      } else {
+        return {Status::RedisParseErr, errInvalidSyntax};
+      }
+    }
+    return Commander::Parse(args);
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
+    std::vector<MemberScore> member_scores;
+    auto s = zset_db.Inter(keys_weights_, aggregate_method_, &member_scores);
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+    output->append(redis::MultiLen(member_scores.size() * (with_scores_ ? 2 : 1)));
+    for (const auto &ms : member_scores) {
+      output->append(redis::BulkString(ms.member));
+      if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
+    }
+    return Status::OK();
+  }
+
+ protected:
+  size_t numkeys_ = 0;
+  std::vector<KeyWeight> keys_weights_;
+  AggregateMethod aggregate_method_ = kAggregateSum;
+  bool with_scores_ = false;
+};
+
+class CommandZInterCard : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    auto parse_result = ParseInt<int>(args[1], 10);
+    if (!parse_result) {
+      return {Status::RedisParseErr, errValueNotInteger};
+    }
+
+    numkeys_ = *parse_result;
+    if (numkeys_ > args.size() - 2) {
+      return {Status::RedisParseErr, errInvalidSyntax};
+    }
+
+    size_t j = 0;
+    while (j < numkeys_) {
+      keys_weights_.emplace_back(KeyWeight{args[j + 2], 1});
+      j++;
+    }
+
+    size_t i = 2 + numkeys_;
+    while (i < args.size()) {

Review Comment:
   Thanks,this is  a problem. and i think some parameter Parse have the same problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] IoCing commented on a diff in pull request #1517: Add support of new command: ZINTER / ZINTERCARD

Posted by "IoCing (via GitHub)" <gi...@apache.org>.
IoCing commented on code in PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#discussion_r1244546358


##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,102 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+
+  for (const auto &iter : dst_zset) {
+    if (member_counters[iter.first] != keys_weights.size()) continue;
+    mscores->emplace_back(MemberScore{iter.first, iter.second});
+  }
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status ZSet::InterCard(const std::vector<KeyWeight> &keys_weights, uint64_t limit, uint64_t *saved_cnt) {

Review Comment:
   ok,i will check it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] torwig commented on a diff in pull request #1517: Zinter and zintercard

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#discussion_r1240792553


##########
src/commands/cmd_zset.cc:
##########
@@ -1311,6 +1311,133 @@ class CommandZInterStore : public CommandZUnionStore {
   }
 };
 
+class CommandZInter : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    auto parse_result = ParseInt<int>(args[1], 10);
+    if (!parse_result) {
+      return {Status::RedisParseErr, errValueNotInteger};
+    }
+
+    numkeys_ = *parse_result;
+    if (numkeys_ > args.size() - 2) {
+      return {Status::RedisParseErr, errInvalidSyntax};
+    }
+
+    size_t j = 0;
+    while (j < numkeys_) {
+      keys_weights_.emplace_back(KeyWeight{args[j + 2], 1});
+      j++;
+    }
+
+    size_t i = 2 + numkeys_;
+    while (i < args.size()) {
+      if (util::ToLower(args[i]) == "aggregate" && i + 1 < args.size()) {
+        if (util::ToLower(args[i + 1]) == "sum") {
+          aggregate_method_ = kAggregateSum;
+        } else if (util::ToLower(args[i + 1]) == "min") {
+          aggregate_method_ = kAggregateMin;
+        } else if (util::ToLower(args[i + 1]) == "max") {
+          aggregate_method_ = kAggregateMax;
+        } else {
+          return {Status::RedisParseErr, "aggregate param error"};
+        }
+        i += 2;
+      } else if (util::ToLower(args[i]) == "weights" && i + numkeys_ < args.size()) {
+        size_t k = 0;
+        while (k < numkeys_) {
+          auto weight = ParseFloat(args[i + k + 1]);
+          if (!weight || std::isnan(*weight)) {
+            return {Status::RedisParseErr, "weight is not a double or out of range"};
+          }
+          keys_weights_[k].weight = *weight;
+          k++;
+        }
+        i += numkeys_ + 1;
+      } else if (util::ToLower(args[i]) == "withscores") {
+        with_scores_ = true;
+        i++;
+      } else {
+        return {Status::RedisParseErr, errInvalidSyntax};
+      }
+    }
+    return Commander::Parse(args);
+  }
+
+  Status Execute(Server *svr, Connection *conn, std::string *output) override {
+    redis::ZSet zset_db(svr->storage, conn->GetNamespace());
+    std::vector<MemberScore> member_scores;
+    auto s = zset_db.Inter(keys_weights_, aggregate_method_, &member_scores);
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+    output->append(redis::MultiLen(member_scores.size() * (with_scores_ ? 2 : 1)));
+    for (const auto &ms : member_scores) {
+      output->append(redis::BulkString(ms.member));
+      if (with_scores_) output->append(redis::BulkString(util::Float2String(ms.score)));
+    }
+    return Status::OK();
+  }
+
+ protected:
+  size_t numkeys_ = 0;
+  std::vector<KeyWeight> keys_weights_;
+  AggregateMethod aggregate_method_ = kAggregateSum;
+  bool with_scores_ = false;
+};
+
+class CommandZInterCard : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    auto parse_result = ParseInt<int>(args[1], 10);
+    if (!parse_result) {
+      return {Status::RedisParseErr, errValueNotInteger};
+    }
+
+    numkeys_ = *parse_result;
+    if (numkeys_ > args.size() - 2) {
+      return {Status::RedisParseErr, errInvalidSyntax};
+    }
+
+    size_t j = 0;
+    while (j < numkeys_) {
+      keys_weights_.emplace_back(KeyWeight{args[j + 2], 1});
+      j++;
+    }
+
+    size_t i = 2 + numkeys_;
+    while (i < args.size()) {

Review Comment:
   What if there will be more than 1 `LIMIT` parameter? I mean, why there is a loop here? Should we just check the next element after the last key?
   `ZINTERCARD 3 key1 key2 key3 LIMIT 10 LIMIT 15 LIMIT 20` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] IoCing commented on a diff in pull request #1517: Add support of new command: ZINTER / ZINTERCARD

Posted by "IoCing (via GitHub)" <gi...@apache.org>.
IoCing commented on code in PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#discussion_r1242192184


##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,112 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  if (!dst_zset.empty()) {
+    for (const auto &iter : dst_zset) {
+      if (member_counters[iter.first] != keys_weights.size()) continue;
+      mscores->emplace_back(MemberScore{iter.first, iter.second});
+    }
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status ZSet::InterCard(const std::vector<KeyWeight> &keys_weights, uint64_t limit, uint64_t *saved_cnt) {
+  if (saved_cnt) *saved_cnt = 0;
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+    bool flag = false;
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      if (member_counters[ms.member] == i + 1) {
+        flag = true;
+      }
+    }
+    if (!flag) {
+      *saved_cnt = 0;
+      return rocksdb::Status::OK();
+    }
+  }
+  uint64_t count = 0;
+  if (!dst_zset.empty()) {
+    for (const auto &iter : dst_zset) {
+      if (member_counters[iter.first] != keys_weights.size()) continue;
+      count++;
+      LOG(INFO) << "limit : " << limit << " count : " << count;

Review Comment:
   Thanks for your comment, I have made the edits @torwig 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] IoCing commented on a diff in pull request #1517: Add support of new command: ZINTER / ZINTERCARD

Posted by "IoCing (via GitHub)" <gi...@apache.org>.
IoCing commented on code in PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#discussion_r1242407090


##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,109 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {

Review Comment:
   Sorry, I don't know. I just copy the implement of interstore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] torwig commented on a diff in pull request #1517: Add support of new command: ZINTER / ZINTERCARD

Posted by "torwig (via GitHub)" <gi...@apache.org>.
torwig commented on code in PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#discussion_r1243222899


##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,102 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+
+  for (const auto &iter : dst_zset) {
+    if (member_counters[iter.first] != keys_weights.size()) continue;
+    mscores->emplace_back(MemberScore{iter.first, iter.second});
+  }
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status ZSet::InterCard(const std::vector<KeyWeight> &keys_weights, uint64_t limit, uint64_t *saved_cnt) {

Review Comment:
   To be completely sure, I'd also check if the `keys_weights` is empty and make an early return because later we use the `keys_weights[0].key` value and it will lead to a runtime error. The same could be applied to the `Inter` function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kvrocks] PragmaTwice commented on a diff in pull request #1517: Add support of new command: ZINTER / ZINTERCARD

Posted by "PragmaTwice (via GitHub)" <gi...@apache.org>.
PragmaTwice commented on code in PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#discussion_r1248425328


##########
src/commands/cmd_zset.cc:
##########
@@ -1311,6 +1311,135 @@ class CommandZInterStore : public CommandZUnionStore {
   }
 };
 
+class CommandZInter : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    auto parse_result = ParseInt<int>(args[1], 10);
+    if (!parse_result) {
+      return {Status::RedisParseErr, errValueNotInteger};
+    }
+
+    numkeys_ = *parse_result;
+    if (numkeys_ > args.size() - 2) {
+      return {Status::RedisParseErr, errInvalidSyntax};
+    }
+
+    size_t j = 0;
+    while (j < numkeys_) {
+      keys_weights_.emplace_back(KeyWeight{args[j + 2], 1});
+      j++;
+    }
+
+    size_t i = 2 + numkeys_;
+    while (i < args.size()) {
+      if (util::ToLower(args[i]) == "aggregate" && i + 1 < args.size()) {
+        if (util::ToLower(args[i + 1]) == "sum") {
+          aggregate_method_ = kAggregateSum;
+        } else if (util::ToLower(args[i + 1]) == "min") {
+          aggregate_method_ = kAggregateMin;
+        } else if (util::ToLower(args[i + 1]) == "max") {
+          aggregate_method_ = kAggregateMax;
+        } else {
+          return {Status::RedisParseErr, "aggregate param error"};
+        }
+        i += 2;
+      } else if (util::ToLower(args[i]) == "weights" && i + numkeys_ < args.size()) {
+        size_t k = 0;
+        while (k < numkeys_) {
+          auto weight = ParseFloat(args[i + k + 1]);
+          if (!weight || std::isnan(*weight)) {
+            return {Status::RedisParseErr, "weight is not a double or out of range"};
+          }
+          keys_weights_[k].weight = *weight;
+          k++;
+        }
+        i += numkeys_ + 1;
+      } else if (util::ToLower(args[i]) == "withscores") {
+        with_scores_ = true;
+        i++;
+      } else {
+        return {Status::RedisParseErr, errInvalidSyntax};
+      }
+    }
+    return Commander::Parse(args);

Review Comment:
   ```suggestion
       CommandParser parser(args, 1);
       numkeys_ = GET_OR_RET(parser.TakeInt<int>());
   
       for (size_t i = 0; i < numkeys_; ++i) {
         keys_weights_.emplace_back(GET_OR_RET(parser.TakeStr()), 1);
       }
   
       while (parser.Good()) {
         if (parser.EatEqICase("aggregate")) {
           if (parser.EatEqICase("sum")) {
             aggregate_method_ = kAggregateSum;
           } else if (parser.EatEqICase("min")) {
             aggregate_method_ = kAggregateMin;
           } else if (parser.EatEqICase("max")) {
             aggregate_method_ = kAggregateMax;
           } else {
             return {Status::RedisParseErr, "aggregate param error"};
           }
         } else if (parser.EatEqICase("weights")) {
           for (size_t i = 0; i < numkeys_; ++i) {
             auto weight = parser.TakeFloat();
             if (!weight || std::isnan(*weight)) {
               return {Status::RedisParseErr, "weight is not a double or out of range"};
             }
             keys_weights_[i].weight = *weight;
           }
         } else if (parser.EatEqICase("withscores")) {
           with_scores_ = true;
         } else {
           return {Status::RedisParseErr, errInvalidSyntax};
         }
       }
       return Status::OK();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org