You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kvrocks.apache.org by "torwig (via GitHub)" <gi...@apache.org> on 2023/06/26 10:03:50 UTC

[GitHub] [kvrocks] torwig commented on a diff in pull request #1517: Add support of new command: ZINTER / ZINTERCARD

torwig commented on code in PR #1517:
URL: https://github.com/apache/kvrocks/pull/1517#discussion_r1241940467


##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,112 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  if (!dst_zset.empty()) {
+    for (const auto &iter : dst_zset) {
+      if (member_counters[iter.first] != keys_weights.size()) continue;
+      mscores->emplace_back(MemberScore{iter.first, iter.second});
+    }
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status ZSet::InterCard(const std::vector<KeyWeight> &keys_weights, uint64_t limit, uint64_t *saved_cnt) {
+  if (saved_cnt) *saved_cnt = 0;
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+    bool flag = false;

Review Comment:
   Give a more descriptive name to a boolean variable instead of `flag`.



##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,112 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  if (!dst_zset.empty()) {

Review Comment:
   You can omit to check the `!mscores->empty()` condition here because later you use range for-loop.



##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,112 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  if (!dst_zset.empty()) {
+    for (const auto &iter : dst_zset) {
+      if (member_counters[iter.first] != keys_weights.size()) continue;
+      mscores->emplace_back(MemberScore{iter.first, iter.second});
+    }
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status ZSet::InterCard(const std::vector<KeyWeight> &keys_weights, uint64_t limit, uint64_t *saved_cnt) {
+  if (saved_cnt) *saved_cnt = 0;
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+    bool flag = false;
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      if (member_counters[ms.member] == i + 1) {
+        flag = true;
+      }
+    }
+    if (!flag) {
+      *saved_cnt = 0;
+      return rocksdb::Status::OK();
+    }
+  }
+  uint64_t count = 0;
+  if (!dst_zset.empty()) {

Review Comment:
   Same as in the previous function.



##########
src/types/redis_zset.cc:
##########
@@ -707,6 +707,112 @@ rocksdb::Status ZSet::InterStore(const Slice &dst, const std::vector<KeyWeight>
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status ZSet::Inter(const std::vector<KeyWeight> &keys_weights, AggregateMethod aggregate_method,
+                            MemberScores *mscores) {
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      double score = ms.score * keys_weights[i].weight;
+      if (std::isnan(score)) score = 0;
+      switch (aggregate_method) {
+        case kAggregateSum:
+          dst_zset[ms.member] += score;
+          if (std::isnan(dst_zset[ms.member])) {
+            dst_zset[ms.member] = 0;
+          }
+          break;
+        case kAggregateMin:
+          if (dst_zset[ms.member] > score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+        case kAggregateMax:
+          if (dst_zset[ms.member] < score) {
+            dst_zset[ms.member] = score;
+          }
+          break;
+      }
+    }
+  }
+  if (!mscores->empty()) {
+    mscores->clear();
+  }
+  if (!dst_zset.empty()) {
+    for (const auto &iter : dst_zset) {
+      if (member_counters[iter.first] != keys_weights.size()) continue;
+      mscores->emplace_back(MemberScore{iter.first, iter.second});
+    }
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status ZSet::InterCard(const std::vector<KeyWeight> &keys_weights, uint64_t limit, uint64_t *saved_cnt) {
+  if (saved_cnt) *saved_cnt = 0;
+  std::map<std::string, double> dst_zset;
+  std::map<std::string, size_t> member_counters;
+  std::vector<MemberScore> target_mscores;
+  uint64_t target_size = 0;
+  RangeScoreSpec spec;
+  auto s = RangeByScore(keys_weights[0].key, spec, &target_mscores, &target_size);
+  if (!s.ok() || target_mscores.empty()) return s;
+
+  for (const auto &ms : target_mscores) {
+    double score = ms.score * keys_weights[0].weight;
+    if (std::isnan(score)) score = 0;
+    dst_zset[ms.member] = score;
+    member_counters[ms.member] = 1;
+  }
+
+  for (size_t i = 1; i < keys_weights.size(); i++) {
+    s = RangeByScore(keys_weights[i].key, spec, &target_mscores, &target_size);
+    if (!s.ok() || target_mscores.empty()) return s;
+    bool flag = false;
+    for (const auto &ms : target_mscores) {
+      if (dst_zset.find(ms.member) == dst_zset.end()) continue;
+      member_counters[ms.member]++;
+      if (member_counters[ms.member] == i + 1) {
+        flag = true;
+      }
+    }
+    if (!flag) {
+      *saved_cnt = 0;
+      return rocksdb::Status::OK();
+    }
+  }
+  uint64_t count = 0;
+  if (!dst_zset.empty()) {
+    for (const auto &iter : dst_zset) {
+      if (member_counters[iter.first] != keys_weights.size()) continue;
+      count++;
+      LOG(INFO) << "limit : " << limit << " count : " << count;

Review Comment:
   I guess you added this line just for debugging purposes. Now you can remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@kvrocks.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org