You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2021/09/13 19:31:32 UTC

[GitHub] [geode] nonbinaryprogrammer opened a new pull request #6861: GEODE-9516: add ZINTERSTORE command

nonbinaryprogrammer opened a new pull request #6861:
URL: https://github.com/apache/geode/pull/6861


   ZINTERSTORE command takes the intersection of N sorted sets and stores
   the results, either summing, taking the max, or taking the min of all
   the scores for each member. The provided destination key will be
   overridden if it already exists.
   
   <!-- Thank you for submitting a contribution to Apache Geode. -->
   
   <!-- In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken: 
   -->
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message?
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `develop`)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   - [ ] Does `gradlew build` run cleanly?
   
   - [ ] Have you written or updated unit tests to verify your changes?
   
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   
   <!-- Note:
   Please ensure that once the PR is submitted, check Concourse for build issues and
   submit an update to your PR as soon as possible. If you need help, please send an
   email to dev@geode.apache.org.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r717945795



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
##########
@@ -164,7 +174,7 @@ public long zrevrank(RedisKey key, byte[] member) {
   @Override
   public long zunionstore(RedisKey destinationKey, List<ZKeyWeight> keyWeights,
       ZAggregator aggregator) {
-    List<RedisKey> keysToLock = new ArrayList<>(keyWeights.size());
+    List<RedisKey> keysToLock = getKeysToLock(destinationKey, keyWeights);

Review comment:
       thank you




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r715231251



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -288,6 +289,54 @@ long zcount(SortedSetScoreRangeOptions rangeOptions) {
     return byteIncr;
   }
 
+  long zinterstore(RegionProvider regionProvider, RedisKey key, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisSortedSet> sets = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, keyWeight.getKey(), false);
+
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+
+      double weight = keyWeight.getWeight();
+      RedisSortedSet weightedSet = new RedisSortedSet(Collections.emptyList());
+
+      for (AbstractOrderedSetEntry entry : set.members.values()) {
+        OrderedSetEntry existingValue = members.get(entry.member);
+        if (existingValue == null) {
+          double score;
+          // Redis math and Java math are different when handling infinity. Specifically:
+          // Java: INFINITY * 0 = NaN
+          // Redis: INFINITY * 0 = 0
+          if (weight == 0) {
+            score = 0;
+          } else if (weight == 1) {
+            score = entry.getScore();
+          } else if (Double.isInfinite(weight) && entry.score == 0D) {
+            score = 0D;
+          } else {
+            double newScore = entry.score * weight;
+            if (Double.isNaN(newScore)) {
+              throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);

Review comment:
       I can handle this case specifically, but I don't like it. with the special case of `-infinity + infinity = 0`, this sequence of events `-infinity + infinity + 3` is not equal to `-infinity + 3 + infinity`, which means that the result of our sum will be dependent on the ordering of the sorted sets we're looking through. But that's a problem that native redis has, so we must as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r717958442



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_KEY_REQUIRED;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.narrowLongToInt;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+    ListIterator<byte[]> argIterator = commandElements.listIterator();
+    RedisResponse syntaxErrorResponse = RedisResponse.error(ERROR_SYNTAX);
+
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    // get the number of keys
+    int numKeys;
+    try {
+      numKeys = narrowLongToInt(Coder.bytesToLong(argIterator.next()));
+      if (numKeys > commandElements.size() - 2) {
+        return syntaxErrorResponse;
+      } else if (numKeys <= 0) {
+        return RedisResponse.error(ERROR_KEY_REQUIRED);
+      }
+    } catch (NumberFormatException ex) {
+      return syntaxErrorResponse;
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>(numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+    byte[] argument;
+
+    // get all the keys
+    for (int i = 0; i < numKeys; i++) {
+      if (!argIterator.hasNext()) {
+        return syntaxErrorResponse;
+      }
+      argument = argIterator.next();
+      if (Arrays.equals(toUpperCaseBytes(argument), bWEIGHTS)
+          || Arrays.equals(toUpperCaseBytes(argument), bAGGREGATE)) {
+        return syntaxErrorResponse;
+      }

Review comment:
       good point, thank you.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] DonalEvans commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
DonalEvans commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r718019879



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
##########
@@ -175,10 +174,7 @@ public long zrevrank(RedisKey key, byte[] member) {
   public long zunionstore(RedisKey destinationKey, List<ZKeyWeight> keyWeights,
       ZAggregator aggregator) {
     List<RedisKey> keysToLock = getKeysToLock(destinationKey, keyWeights);
-    for (ZKeyWeight kw : keyWeights) {
-      getRegionProvider().ensureKeyIsLocal(kw.getKey());
-      keysToLock.add(kw.getKey());
-    }
+
     getRegionProvider().ensureKeyIsLocal(destinationKey);
     keysToLock.add(destinationKey);

Review comment:
       These two lines are also duplicated in the `getKeysToLock()`  method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r714285055



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -578,6 +625,91 @@ private int getMaxElementsToReturn(AbstractSortedSetRangeOptions<?> rangeOptions
     return result;
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList());
+
+    for (RedisSortedSet set : sets) {

Review comment:
       oh, yeah that's much better. I meant to change that and forgot.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r717879843



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +669,87 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private void computeIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {
+        newScore = getSumOfScoresForMember(sets, member);
+      } else if (aggregator.equals(ZAggregator.MAX)) {
+        newScore = getMaxScoreForMember(sets, member);
+      } else {
+        newScore = getMinScoreForMember(sets, member);
+      }
+
+      if (newScore != null) {
+        if (newScore.isNaN()) {

Review comment:
       Yes, I believe it is now unnecessary because the ZAggregator function that is applied doesn't ever return null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] dschneider-pivotal commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
dschneider-pivotal commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r716934456



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +669,87 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private void computeIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {
+        newScore = getSumOfScoresForMember(sets, member);
+      } else if (aggregator.equals(ZAggregator.MAX)) {
+        newScore = getMaxScoreForMember(sets, member);
+      } else {
+        newScore = getMinScoreForMember(sets, member);
+      }
+
+      if (newScore != null) {
+        if (newScore.isNaN()) {
+          throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+        }
+        retVal.memberAdd(member, newScore);
+      }
+    }
+  }
+
+  private Double getSumOfScoresForMember(List<RedisSortedSet> sets, byte[] memberName) {
+    OrderedSetEntry member;
+    double runningTotal = 0;
+
+    for (RedisSortedSet set : sets) {
+      if ((member = set.members.get(memberName)) != null) {
+        if (Double.isInfinite(runningTotal) && member.getScore() == -runningTotal) {
+          runningTotal = 0;
+        } else {
+          runningTotal += member.getScore();
+        }
+      } else {
+        return null;
+      }
+    }
+
+    this.memberAdd(memberName, runningTotal);
+    return runningTotal;
+  }
+
+  private Double getMaxScoreForMember(List<RedisSortedSet> sets, byte[] member) {
+    double runningMax = Double.MIN_VALUE;
+    for (RedisSortedSet set : sets) {
+      if (set.members.containsKey(member)) {

Review comment:
       You had a question about an old comment I made about containsKey. In this code instead of calling members.containsKey on line 724 followed by members.get on line 725 you could just call members.get and test for null. That way you do one map lookup instead of two.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] DonalEvans commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
DonalEvans commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r718019879



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
##########
@@ -175,10 +174,7 @@ public long zrevrank(RedisKey key, byte[] member) {
   public long zunionstore(RedisKey destinationKey, List<ZKeyWeight> keyWeights,
       ZAggregator aggregator) {
     List<RedisKey> keysToLock = getKeysToLock(destinationKey, keyWeights);
-    for (ZKeyWeight kw : keyWeights) {
-      getRegionProvider().ensureKeyIsLocal(kw.getKey());
-      keysToLock.add(kw.getKey());
-    }
+
     getRegionProvider().ensureKeyIsLocal(destinationKey);
     keysToLock.add(destinationKey);

Review comment:
       These two lines are also duplicated in the `getKeysToLock()`  method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r715195232



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (keyWeights.size() < numKeys) {
+        keyWeights.add(new ZKeyWeight(new RedisKey(arg), 1D));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!allWeightsAreOne(keyWeights)) {

Review comment:
       order is not enforced and multiple aggregates is valid and only the last one is applied. good call on the boolean




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r715811939



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (keyWeights.size() < numKeys) {
+        keyWeights.add(new ZKeyWeight(new RedisKey(arg), 1D));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!allWeightsAreOne(keyWeights)) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            keyWeights.get(i).setWeight(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        continue;
+      }
+
+      // End up here if we have more keys than weights
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (keyWeights.size() != numKeys) {

Review comment:
       without this check we accept fewer keys than we specified. i.e `zinterstore 3 set1 set2`. The test that fails when this check is removed is `AbstractZInterStoreIntegrationTest.shouldError_givenNumKeysTooLarge`. the test passes on native redis




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r717879843



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +669,87 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private void computeIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {
+        newScore = getSumOfScoresForMember(sets, member);
+      } else if (aggregator.equals(ZAggregator.MAX)) {
+        newScore = getMaxScoreForMember(sets, member);
+      } else {
+        newScore = getMinScoreForMember(sets, member);
+      }
+
+      if (newScore != null) {
+        if (newScore.isNaN()) {

Review comment:
       Yes, I believe it is now unnecessary because the ZAggregator function that is applied doesn't ever return null.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r715912797



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZInterStoreIntegrationTest.java
##########
@@ -0,0 +1,696 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import org.assertj.core.data.Offset;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+import redis.clients.jedis.Tuple;
+import redis.clients.jedis.ZParams;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.RedisConstants;
+
+public abstract class AbstractZInterStoreIntegrationTest implements RedisIntegrationTest {
+
+  private static final String NEW_SET = "{user1}new";
+  private static final String KEY1 = "{user1}sset1";
+  private static final String KEY2 = "{user1}sset2";
+  private static final String KEY3 = "{user1}sset3";
+
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void shouldError_givenTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZINTERSTORE, 3);
+  }
+
+  @Test
+  public void shouldError_givenWrongKeyType() {
+    final String STRING_KEY = "{user1}stringKey";
+    jedis.set(STRING_KEY, "value");
+    assertThatThrownBy(() -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "2",
+        STRING_KEY, KEY1)).hasMessage("WRONGTYPE " + RedisConstants.ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void shouldError_givenSetsCrossSlots() {
+    final String WRONG_KEY = "{user2}another";
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "2", WRONG_KEY,
+            KEY1)).hasMessage("CROSSSLOT " + RedisConstants.ERROR_WRONG_SLOT);
+  }
+
+  @Test
+  public void shouldError_givenNumkeysTooLarge() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "2", KEY1))
+            .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenNumkeysTooSmall() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1", KEY1, KEY2))
+            .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenTooManyWeights() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "WEIGHTS", "2", "3")).hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenTooFewWeights() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "2",
+            KEY1, KEY2, "WEIGHTS", "1")).hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenWeightNotAFloat() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "WEIGHTS", "not-a-number"))
+                .hasMessage("ERR " + RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT);
+  }
+
+  @Test
+  public void shouldError_givenWeightsWithoutAnyValues() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "WEIGHTS")).hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenMultipleWeightKeywords() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "WEIGHT", "1.0", "WEIGHT", "2.0"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenUnknownAggregate() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "AGGREGATE", "UNKNOWN", "WEIGHTS", "1"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenAggregateKeywordWithoutValue() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "AGGREGATE")).hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenMultipleAggregates() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "WEIGHTS", "1", "AGGREGATE", "SUM", "MIN"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenWeightOfOne_andOneRedisSortedSet() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    Set<Tuple> expectedResults = convertToTuples(scores, (ignore, value) -> value);
+    jedis.zadd(KEY1, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(1), KEY1))
+        .isEqualTo(expectedResults.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenWeightOfZero_andOneRedisSortedSet() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    Set<Tuple> expectedResults = convertToTuples(scores, (ignore, value) -> 0D);
+    jedis.zadd(KEY1, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(0), KEY1))
+        .isEqualTo(scores.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenWeightOfPositiveInfinity_andOneRedisSortedSet() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    Set<Tuple> expectedResults =
+        convertToTuples(scores, (ignore, value) -> value > 0 ? Double.POSITIVE_INFINITY : value);
+    jedis.zadd(KEY1, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(Double.POSITIVE_INFINITY),
+        KEY1)).isEqualTo(scores.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenWeightOfNegativeInfinity_andOneRedisSortedSet() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    jedis.zadd(KEY1, scores);
+
+    Set<Tuple> expectedResults = new LinkedHashSet<>();
+    expectedResults.add(new Tuple("player1", Double.POSITIVE_INFINITY));
+    expectedResults.add(new Tuple("player2", 0D));
+    expectedResults.add(new Tuple("player3", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player4", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player5", Double.NEGATIVE_INFINITY));
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(Double.NEGATIVE_INFINITY),
+        KEY1)).isEqualTo(scores.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenWeightOfN_andOneRedisSortedSet() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    jedis.zadd(KEY1, scores);
+
+    double multiplier = 2.71D;
+
+    Set<Tuple> expectedResults = new LinkedHashSet<>();
+    expectedResults.add(new Tuple("player1", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player2", 0D));
+    expectedResults.add(new Tuple("player3", multiplier));
+    expectedResults.add(new Tuple("player4", Double.POSITIVE_INFINITY));
+    expectedResults.add(new Tuple("player5", 3.2D * multiplier));
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(multiplier), KEY1))
+        .isEqualTo(scores.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenMultipleRedisSortedSets() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    Set<Tuple> expectedResults = new LinkedHashSet<>();
+    expectedResults.add(new Tuple("player1", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player2", 0D));
+    expectedResults.add(new Tuple("player3", 2D));
+    expectedResults.add(new Tuple("player4", Double.POSITIVE_INFINITY));
+    expectedResults.add(new Tuple("player5", 3.2D * 2));
+
+    jedis.zadd(KEY1, scores);
+    jedis.zadd(KEY2, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams(), KEY1, KEY2))
+        .isEqualTo(expectedResults.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenTwoRedisSortedSets_withDifferentWeights() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    Set<Tuple> expectedResults = new LinkedHashSet<>();
+    expectedResults.add(new Tuple("player1", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player2", 0D));
+    expectedResults.add(new Tuple("player3", 3D));
+    expectedResults.add(new Tuple("player4", Double.POSITIVE_INFINITY));
+    expectedResults.add(new Tuple("player5", 3.2D * 3));
+
+    jedis.zadd(KEY1, scores);
+    jedis.zadd(KEY2, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(1, 2), KEY1, KEY2))
+        .isEqualTo(expectedResults.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenMultipleIdenticalRedisSortedSets_withDifferentPositiveWeights() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    Set<Tuple> expectedResults = new LinkedHashSet<>();
+    expectedResults.add(new Tuple("player1", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player2", 0D));
+    expectedResults.add(new Tuple("player3", 4.5D));
+    expectedResults.add(new Tuple("player4", Double.POSITIVE_INFINITY));
+    expectedResults.add(new Tuple("player5", 3.2D * 4.5D));
+
+    jedis.zadd(KEY1, scores);
+    jedis.zadd(KEY2, scores);
+    jedis.zadd(KEY3, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(1D, 2D, 1.5D), KEY1, KEY2, KEY3))
+        .isEqualTo(expectedResults.size());
+
+    Set<Tuple> actualResults = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThatActualScoresAreVeryCloseToExpectedScores(expectedResults, actualResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenOneSetDoesNotExist() {
+    Map<String, Double> scores = buildMapOfMembersAndScores(1, 10);
+    Set<Tuple> expectedResults = convertToTuples(scores, (i, x) -> x);
+    jedis.zadd(KEY1, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, KEY1, KEY2)).isEqualTo(scores.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expectedResults, results);
+  }
+
+  @Test
+  public void shouldStoreNothingAtDestinationKey_givenTwoNonIntersectingSets() {
+    Map<String, Double> scores = buildMapOfMembersAndScores(1, 5);
+    Map<String, Double> nonIntersectionScores = buildMapOfMembersAndScores(6, 10);
+    jedis.zadd(KEY1, scores);
+    jedis.zadd(KEY2, nonIntersectionScores);
+
+    assertThat(jedis.zinterstore(NEW_SET, KEY1, KEY2)).isZero();
+
+    assertThat(jedis.zrangeWithScores(NEW_SET, 0, 10)).isEmpty();
+  }
+
+  @Test
+  public void shouldStoreSumOfIntersection_givenThreePartiallyOverlappingSets() {
+    Map<String, Double> scores1 = buildMapOfMembersAndScores(1, 10);
+    Map<String, Double> scores2 = buildMapOfMembersAndScores(6, 13);
+    Map<String, Double> scores3 = buildMapOfMembersAndScores(4, 11);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.SUM),
+        KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(tupleSumOfScores("player" + i, scores1, scores2, scores3));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreMaxOfIntersection_givenThreePartiallyOverlappingSets() {
+    Map<String, Double> scores1 = buildMapOfMembersAndScores(1, 10);
+    Map<String, Double> scores2 = buildMapOfMembersAndScores(6, 13);
+    Map<String, Double> scores3 = buildMapOfMembersAndScores(4, 11);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.MAX),
+        KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(tupleMaxOfScores("player" + i, scores1, scores2, scores3));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreMinOfIntersection_givenThreePartiallyOverlappingSets() {
+    Map<String, Double> scores1 = buildMapOfMembersAndScores(1, 10);
+    Map<String, Double> scores2 = buildMapOfMembersAndScores(6, 13);
+    Map<String, Double> scores3 = buildMapOfMembersAndScores(4, 11);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.MIN),
+        KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(tupleMinOfScores("player" + i, scores1, scores2, scores3));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreSumOfIntersection_givenThreePartiallyOverlappingSets_andWeights() {
+    double weight1 = 0D;
+    double weight2 = 42D;
+    double weight3 = -7.3D;
+
+    Map<String, Double> scores1 = buildMapOfMembersAndScores(1, 10);
+    Map<String, Double> scores2 = buildMapOfMembersAndScores(6, 13);
+    Map<String, Double> scores3 = buildMapOfMembersAndScores(4, 11);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    ZParams zParams = new ZParams().aggregate(ZParams.Aggregate.SUM)
+        .weights(weight1, weight2, weight3);
+
+    assertThat(jedis.zinterstore(NEW_SET, zParams, KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(tupleSumOfScoresWithWeights("player" + i, scores1, scores2, scores3, weight1,
+          weight2, weight3));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreMaxOfIntersection_givenThreePartiallyOverlappingSets_andWeights() {
+    double weight1 = 0D;
+    double weight2 = 42D;
+    double weight3 = -7.3D;
+
+    Map<String, Double> scores1 = buildMapOfMembersAndScores(1, 10);
+    Map<String, Double> scores2 = buildMapOfMembersAndScores(6, 13);
+    Map<String, Double> scores3 = buildMapOfMembersAndScores(4, 11);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    ZParams zParams = new ZParams().aggregate(ZParams.Aggregate.MAX)
+        .weights(weight1, weight2, weight3);
+
+    assertThat(jedis.zinterstore(NEW_SET, zParams, KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(tupleMaxOfScoresWithWeights("player" + i, scores1, scores2, scores3, weight1,
+          weight2, weight3));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreMinOfIntersection_givenThreePartiallyOverlappingSets_andWeights() {
+    double weight1 = 0D;
+    double weight2 = 42D;
+    double weight3 = -7.3D;
+
+    Map<String, Double> scores1 = buildMapOfMembersAndScores(1, 10);
+    Map<String, Double> scores2 = buildMapOfMembersAndScores(6, 13);
+    Map<String, Double> scores3 = buildMapOfMembersAndScores(4, 11);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    ZParams zParams = new ZParams().aggregate(ZParams.Aggregate.MIN)
+        .weights(weight1, weight2, weight3);
+
+    assertThat(jedis.zinterstore(NEW_SET, zParams, KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(tupleMinOfScoresWithWeights("player" + i, scores1, scores2, scores3, weight1,
+          weight2, weight3));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreMaxOfIntersection_givenThreePartiallyOverlappingSetsWithIdenticalScores() {
+    double score = 3.141592;
+    Map<String, Double> scores1 = buildMapOfMembersAndIdenticalScores(1, 10, score);
+    Map<String, Double> scores2 = buildMapOfMembersAndIdenticalScores(6, 13, score);
+    Map<String, Double> scores3 = buildMapOfMembersAndIdenticalScores(4, 11, score);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    ZParams zParams = new ZParams().aggregate(ZParams.Aggregate.MAX);
+
+    assertThat(jedis.zinterstore(NEW_SET, zParams, KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(new Tuple("player" + i, score));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreMinOfIntersection_givenThreePartiallyOverlappingSetsWithIdenticalScores() {
+    double score = 3.141592;
+    Map<String, Double> scores1 = buildMapOfMembersAndIdenticalScores(1, 10, score);
+    Map<String, Double> scores2 = buildMapOfMembersAndIdenticalScores(6, 13, score);
+    Map<String, Double> scores3 = buildMapOfMembersAndIdenticalScores(4, 11, score);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    ZParams zParams = new ZParams().aggregate(ZParams.Aggregate.MAX);
+
+    assertThat(jedis.zinterstore(NEW_SET, zParams, KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(new Tuple("player" + i, score));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreIntersectionUsingLastAggregate_givenMultipleAggregateKeywords() {
+    Map<String, Double> scores1 = buildMapOfMembersAndScores(1, 15);
+    Map<String, Double> scores2 = buildMapOfMembersAndScores(9, 13);
+    Map<String, Double> scores3 = buildMapOfMembersAndScores(12, 18);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 12; i <= 13; i++) {
+      expected.add(tupleMaxOfScores("player" + i, scores1, scores2, scores3));
+    }
+
+    jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "3",
+        KEY1, KEY2, KEY3, "AGGREGATE", "MIN", "AGGREGATE", "MAX");
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, results);
+  }
+
+  @Test
+  public void shouldStoreIntersection_whenTargetExistsAndSetsAreDuplicated() {
+    Map<String, Double> scores = buildMapOfMembersAndScores(0, 10);
+    jedis.zadd(KEY1, scores);
+    jedis.zadd(KEY2, scores);
+
+    Set<Tuple> expectedResults = convertToTuples(scores, (ignore, score) -> score * 2);
+
+    // destination key is a key that exists
+    assertThat(jedis.zinterstore(KEY1, KEY1, KEY2)).isEqualTo(scores.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(KEY1, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expectedResults, results);
+  }
+
+  @Test
+  public void ensureSetConsistency_andNoExceptions_whenRunningConcurrently() {
+    int scoreCount = 1000;
+    jedis.zadd("{A}ones", buildMapOfMembersAndScores(0, scoreCount - 1));
+
+    jedis.zadd("{A}scores1", buildMapOfMembersAndScores(0, scoreCount - 1));
+    jedis.zadd("{A}scores2", buildMapOfMembersAndScores(0, scoreCount - 1));
+    jedis.zadd("{A}scores3", buildMapOfMembersAndScores(0, scoreCount - 1));
+
+    new ConcurrentLoopingThreads(1000,
+        i -> jedis.zadd("{A}scores1", (double) i, "player" + i),
+        i -> jedis.zadd("{A}scores2", (double) i, "player" + i),
+        i -> jedis.zadd("{A}scores3", (double) i, "player" + i),
+        i -> jedis.zinterstore("{A}maxSet", new ZParams().aggregate(ZParams.Aggregate.MAX),
+            "{A}scores1", "{A}scores2", "{A}scores3"),
+        // This ensures that the lock ordering for keys is working
+        i -> jedis.zinterstore("{A}minSet", new ZParams().aggregate(ZParams.Aggregate.MIN),
+            "{A}scores1", "{A}scores2", "{A}scores3"))
+                .runWithAction(() -> {
+                  assertThat(jedis.zrangeWithScores("{A}maxSet", 0, scoreCount))
+                      .hasSize(scoreCount);
+                  assertThat(jedis.zrangeWithScores("{A}minSet", 0, scoreCount))
+                      .hasSize(scoreCount);
+                });
+  }
+
+  /************* Helper Methods *************/
+
+  private Map<String, Double> buildMapOfMembersAndScores() {
+    Map<String, Double> scores = new LinkedHashMap<>();
+    scores.put("player1", Double.NEGATIVE_INFINITY);
+    scores.put("player2", 0D);
+    scores.put("player3", 1D);
+    scores.put("player4", Double.POSITIVE_INFINITY);
+    scores.put("player5", 3.2D);
+    return scores;
+  }
+
+  private Map<String, Double> buildMapOfMembersAndScores(int start, int end) {
+    Map<String, Double> scores = new LinkedHashMap<>();
+    Random random = new Random();
+
+    for (int i = start; i <= end; i++) {
+      scores.put("player" + i, random.nextDouble());
+    }
+
+    return scores;
+  }
+
+  private Map<String, Double> buildMapOfMembersAndIdenticalScores(int start, int end,
+      double score) {
+    Map<String, Double> scores = new LinkedHashMap<>();
+
+    for (int i = start; i <= end; i++) {
+      scores.put("player" + i, score);
+    }
+
+    return scores;
+  }
+
+  Tuple tupleSumOfScores(String memberName, Map<String, Double> scores1,
+      Map<String, Double> scores2, Map<String, Double> scores3) {
+    return tupleSumOfScoresWithWeights(memberName, scores1, scores2, scores3, 1, 1, 1);
+  }
+
+  Tuple tupleSumOfScoresWithWeights(String memberName, Map<String, Double> scores1,
+      Map<String, Double> scores2, Map<String, Double> scores3, double weight1, double weight2,
+      double weight3) {
+    return new Tuple(memberName, scores1.get(memberName) * weight1
+        + scores2.get(memberName) * weight2
+        + scores3.get(memberName) * weight3);
+  }
+
+  Tuple tupleMaxOfScores(String memberName, Map<String, Double> scores1,
+      Map<String, Double> scores2, Map<String, Double> scores3) {
+    return tupleMaxOfScoresWithWeights(memberName, scores1, scores2, scores3, 1, 1, 1);
+  }
+
+  Tuple tupleMaxOfScoresWithWeights(String memberName, Map<String, Double> scores1,
+      Map<String, Double> scores2, Map<String, Double> scores3, double weight1, double weight2,
+      double weight3) {
+    return new Tuple(memberName, max(max(scores1.get(memberName) * weight1,
+        scores2.get(memberName) * weight2), scores3.get(memberName) * weight3));
+  }
+
+  Tuple tupleMinOfScores(String memberName, Map<String, Double> scores1,
+      Map<String, Double> scores2, Map<String, Double> scores3) {
+    return tupleMinOfScoresWithWeights(memberName, scores1, scores2, scores3, 1, 1, 1);
+  }
+
+  Tuple tupleMinOfScoresWithWeights(String memberName, Map<String, Double> scores1,
+      Map<String, Double> scores2, Map<String, Double> scores3, double weight1, double weight2,
+      double weight3) {
+    return new Tuple(memberName, min(min(scores1.get(memberName) * weight1,
+        scores2.get(memberName) * weight2), scores3.get(memberName) * weight3));
+  }
+
+  private Set<Tuple> convertToTuples(Map<String, Double> map,
+      BiFunction<Integer, Double, Double> function) {
+    Set<Tuple> tuples = new LinkedHashSet<>();
+    int x = 0;
+    for (Map.Entry<String, Double> e : map.entrySet()) {
+      tuples.add(new Tuple(e.getKey().getBytes(), function.apply(x++, e.getValue())));
+    }
+
+    return tuples;
+  }
+
+  private void assertThatActualScoresAreVeryCloseToExpectedScores(
+      Set<Tuple> expectedResults, Set<Tuple> results) {
+    for (Tuple expectedResult : expectedResults) {

Review comment:
       I think this method should also be testing whether all members in `expectedResults` are also exactly what is in `results`. As it is now, passing in `[a, b, c]` and `[d, e, f]` as members would never cause the assertion to be checked.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r714316558



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZInterStoreExecutor.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZInterStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>((int) numKeys);
+    List<Double> weights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (sourceKeys.size() < numKeys) {
+        sourceKeys.add(new RedisKey(arg));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!weights.isEmpty()) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            weights.add(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));

Review comment:
       No, Iterator.next() throws a NoSubElementException when there isn't a next element, which we subsequently catch and send a syntax error as the response. Does that seem sufficient?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r715063387



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -288,6 +289,54 @@ long zcount(SortedSetScoreRangeOptions rangeOptions) {
     return byteIncr;
   }
 
+  long zinterstore(RegionProvider regionProvider, RedisKey key, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisSortedSet> sets = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, keyWeight.getKey(), false);
+
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+
+      double weight = keyWeight.getWeight();
+      RedisSortedSet weightedSet = new RedisSortedSet(Collections.emptyList());
+
+      for (AbstractOrderedSetEntry entry : set.members.values()) {
+        OrderedSetEntry existingValue = members.get(entry.member);
+        if (existingValue == null) {
+          double score;
+          // Redis math and Java math are different when handling infinity. Specifically:
+          // Java: INFINITY * 0 = NaN
+          // Redis: INFINITY * 0 = 0
+          if (weight == 0) {
+            score = 0;
+          } else if (weight == 1) {
+            score = entry.getScore();
+          } else if (Double.isInfinite(weight) && entry.score == 0D) {
+            score = 0D;
+          } else {
+            double newScore = entry.score * weight;
+            if (Double.isNaN(newScore)) {
+              throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);

Review comment:
       I get that native redis doesn't have the same behavior, but in Java positive infinity plus negative infinity results in NaN, not 0

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -578,6 +627,91 @@ private int getMaxElementsToReturn(AbstractSortedSetRangeOptions<?> rangeOptions
     return result;
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList());

Review comment:
       I made some changes to this area of the code, so please take a look and we can talk more about it, if you don't mind.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer merged pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer merged pull request #6861:
URL: https://github.com/apache/geode/pull/6861


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r715895087



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -288,6 +288,49 @@ long zcount(SortedSetScoreRangeOptions rangeOptions) {
     return Coder.doubleToBytes(score);
   }
 
+  long zinterstore(RegionProvider regionProvider, RedisKey key, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisSortedSet> sets = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, keyWeight.getKey(), false);
+
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+
+      double weight = keyWeight.getWeight();
+      RedisSortedSet weightedSet = new RedisSortedSet(Collections.emptyList(), new double[] {});
+
+      for (AbstractOrderedSetEntry entry : set.members.values()) {

Review comment:
       Sorry for the late comments and also if this has already been discussed, but it seems like we're potentially doing a lot of work here and creating a lot of allocations for sets that are just going to be tossed away. Would it be possible to first determine whether a member is intersecting and then perform any aggregation/weighting? Essentially what is already happening in `getIntersection` but include the weighting there instead of this loop. I like how that method is picking the smallest set to iterate over as the base.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r715187655



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +650,84 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;

Review comment:
       it needs to be nullable




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] dschneider-pivotal commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
dschneider-pivotal commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r715110221



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -288,6 +288,49 @@ long zcount(SortedSetScoreRangeOptions rangeOptions) {
     return Coder.doubleToBytes(score);
   }
 
+  long zinterstore(RegionProvider regionProvider, RedisKey key, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisSortedSet> sets = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, keyWeight.getKey(), false);
+
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+
+      double weight = keyWeight.getWeight();
+      RedisSortedSet weightedSet = new RedisSortedSet(Collections.emptyList(), new double[] {});
+
+      for (AbstractOrderedSetEntry entry : set.members.values()) {
+        OrderedSetEntry existingValue = members.get(entry.getMember());
+        if (existingValue == null) {
+          double score;
+          // Redis math and Java math are different when handling infinity. Specifically:
+          // Java: INFINITY * 0 = NaN
+          // Redis: INFINITY * 0 = 0
+          if (weight == 0) {
+            score = 0;
+          } else if (weight == 1) {
+            score = entry.getScore();
+          } else if (Double.isInfinite(weight) && entry.score == 0D) {

Review comment:
       I think entry.score should be entry.getScore(). Should we make those fields private on AbstractOrderedSetEntry to force the use of the gettor?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -288,6 +288,49 @@ long zcount(SortedSetScoreRangeOptions rangeOptions) {
     return Coder.doubleToBytes(score);
   }
 
+  long zinterstore(RegionProvider regionProvider, RedisKey key, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisSortedSet> sets = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, keyWeight.getKey(), false);
+
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+
+      double weight = keyWeight.getWeight();
+      RedisSortedSet weightedSet = new RedisSortedSet(Collections.emptyList(), new double[] {});
+
+      for (AbstractOrderedSetEntry entry : set.members.values()) {
+        OrderedSetEntry existingValue = members.get(entry.getMember());

Review comment:
       since you never really use "existingValue" it seems like it might be better to
   change these two lines to just be:
   if (!members.contains(entry.getMember())

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -288,6 +288,49 @@ long zcount(SortedSetScoreRangeOptions rangeOptions) {
     return Coder.doubleToBytes(score);
   }
 
+  long zinterstore(RegionProvider regionProvider, RedisKey key, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisSortedSet> sets = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, keyWeight.getKey(), false);
+
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+
+      double weight = keyWeight.getWeight();
+      RedisSortedSet weightedSet = new RedisSortedSet(Collections.emptyList(), new double[] {});

Review comment:
       it seems kind of odd to create this with empty lists and then turn right around add up to set.size() items.
   I think it would be good to add a RedisSortedSet(int size) {
     this.members = new MemberMap(size);
   } constructor
   and use that constructor here

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +650,84 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;

Review comment:
       I think this can be "double" instead of "Double"

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +650,84 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {
+        newScore = getSumOfScoresForMember(sets, member, retVal);
+      } else if (aggregator.equals(ZAggregator.MAX)) {
+        newScore = getMaxScoreForMember(sets, member, retVal);
+      } else {
+        newScore = getMinScoreForMember(sets, member, retVal);
+      }
+
+      if (newScore != null) {
+        if (newScore.isNaN()) {
+          throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+        }
+        retVal.memberAdd(member, newScore);
+      }
+    }
+    return retVal;
+  }
+
+  private Double getSumOfScoresForMember(List<RedisSortedSet> sets, byte[] member,
+      RedisSortedSet retVal) {
+    double runningTotal = 0;
+    for (RedisSortedSet set : sets) {
+      if (set.members.containsKey(member)) {
+        runningTotal += set.members.get(member).score;
+      } else {
+        return null;
+      }
+    }
+    retVal.memberAdd(member, runningTotal);
+    return runningTotal;
+  }
+
+  private Double getMaxScoreForMember(List<RedisSortedSet> sets, byte[] member,

Review comment:
       change "Double" to "double"

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -288,6 +288,49 @@ long zcount(SortedSetScoreRangeOptions rangeOptions) {
     return Coder.doubleToBytes(score);
   }
 
+  long zinterstore(RegionProvider regionProvider, RedisKey key, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisSortedSet> sets = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, keyWeight.getKey(), false);
+
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+
+      double weight = keyWeight.getWeight();
+      RedisSortedSet weightedSet = new RedisSortedSet(Collections.emptyList(), new double[] {});
+
+      for (AbstractOrderedSetEntry entry : set.members.values()) {
+        OrderedSetEntry existingValue = members.get(entry.getMember());
+        if (existingValue == null) {
+          double score;
+          // Redis math and Java math are different when handling infinity. Specifically:
+          // Java: INFINITY * 0 = NaN
+          // Redis: INFINITY * 0 = 0
+          if (weight == 0) {
+            score = 0;
+          } else if (weight == 1) {
+            score = entry.getScore();
+          } else if (Double.isInfinite(weight) && entry.score == 0D) {
+            score = 0D;
+          } else {
+            score = entry.score * weight;
+          }
+          weightedSet.memberAdd(entry.member, score);

Review comment:
       use getMember()

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -288,6 +288,49 @@ long zcount(SortedSetScoreRangeOptions rangeOptions) {
     return Coder.doubleToBytes(score);
   }
 
+  long zinterstore(RegionProvider regionProvider, RedisKey key, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisSortedSet> sets = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, keyWeight.getKey(), false);
+
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+
+      double weight = keyWeight.getWeight();
+      RedisSortedSet weightedSet = new RedisSortedSet(Collections.emptyList(), new double[] {});
+
+      for (AbstractOrderedSetEntry entry : set.members.values()) {
+        OrderedSetEntry existingValue = members.get(entry.getMember());
+        if (existingValue == null) {
+          double score;
+          // Redis math and Java math are different when handling infinity. Specifically:
+          // Java: INFINITY * 0 = NaN
+          // Redis: INFINITY * 0 = 0
+          if (weight == 0) {
+            score = 0;
+          } else if (weight == 1) {
+            score = entry.getScore();
+          } else if (Double.isInfinite(weight) && entry.score == 0D) {
+            score = 0D;
+          } else {
+            score = entry.score * weight;
+          }
+          weightedSet.memberAdd(entry.member, score);
+        }
+      }
+      sets.add(weightedSet);
+    }
+
+    RedisSortedSet intersection = getIntersection(sets, aggregator);

Review comment:
       We already have a new RedisSortedSet instance we created and called this method on it. It is "this". So instead of having getIntersection create and return a new instance, I think it should just be something like "void computeIntersection(sets, aggregator)" and the intersection will be stored in itself. Then the next two lines would store "this" in the region and return getSortedSetSize().

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -288,6 +288,49 @@ long zcount(SortedSetScoreRangeOptions rangeOptions) {
     return Coder.doubleToBytes(score);
   }
 
+  long zinterstore(RegionProvider regionProvider, RedisKey key, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisSortedSet> sets = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, keyWeight.getKey(), false);
+
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+
+      double weight = keyWeight.getWeight();
+      RedisSortedSet weightedSet = new RedisSortedSet(Collections.emptyList(), new double[] {});
+
+      for (AbstractOrderedSetEntry entry : set.members.values()) {
+        OrderedSetEntry existingValue = members.get(entry.getMember());
+        if (existingValue == null) {
+          double score;
+          // Redis math and Java math are different when handling infinity. Specifically:
+          // Java: INFINITY * 0 = NaN
+          // Redis: INFINITY * 0 = 0
+          if (weight == 0) {
+            score = 0;
+          } else if (weight == 1) {
+            score = entry.getScore();
+          } else if (Double.isInfinite(weight) && entry.score == 0D) {
+            score = 0D;
+          } else {
+            score = entry.score * weight;

Review comment:
       use getScore()

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +650,84 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {
+        newScore = getSumOfScoresForMember(sets, member, retVal);
+      } else if (aggregator.equals(ZAggregator.MAX)) {
+        newScore = getMaxScoreForMember(sets, member, retVal);
+      } else {
+        newScore = getMinScoreForMember(sets, member, retVal);
+      }
+
+      if (newScore != null) {
+        if (newScore.isNaN()) {
+          throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+        }
+        retVal.memberAdd(member, newScore);
+      }
+    }
+    return retVal;
+  }
+
+  private Double getSumOfScoresForMember(List<RedisSortedSet> sets, byte[] member,

Review comment:
       change "Double" to "double"

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +650,84 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {

Review comment:
       instead of testing for the different ZAggregator types here and having three methods that implement the logic, couldn't have just one method (name it double getIntersectionScore(sets, member, aggregator)) and then when it comes time to compute the new score value call "total = aggregator.getFunction().apply(total, entry.score)"? If you need to you could enhance Zaggregator but I think the Function on it is all you need.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
##########
@@ -164,6 +174,17 @@ public long zrevrank(RedisKey key, byte[] member) {
   @Override
   public long zunionstore(RedisKey destinationKey, List<ZKeyWeight> keyWeights,
       ZAggregator aggregator) {
+    List<RedisKey> keysToLock = lockKeys(destinationKey, keyWeights);
+
+    return stripedExecute(destinationKey, keysToLock,
+        () -> new RedisSortedSet(Collections.emptyList(), new double[] {}).zunionstore(

Review comment:
       If you add that new RedisSortedSet constructor that just takes a size then you should use it here with a size of 0. 

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -288,6 +288,49 @@ long zcount(SortedSetScoreRangeOptions rangeOptions) {
     return Coder.doubleToBytes(score);
   }
 
+  long zinterstore(RegionProvider regionProvider, RedisKey key, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisSortedSet> sets = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, keyWeight.getKey(), false);
+
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+
+      double weight = keyWeight.getWeight();
+      RedisSortedSet weightedSet = new RedisSortedSet(Collections.emptyList(), new double[] {});
+
+      for (AbstractOrderedSetEntry entry : set.members.values()) {
+        OrderedSetEntry existingValue = members.get(entry.getMember());

Review comment:
       But at this point in the code I think "members" is empty. When this method is called from the FunctionExecutor class it does it on a new empty instance it creates. And at this point it has not added anything to that instance. So I think looking at this instance's state here is wrong. Should this loop just add the correct weight to each existing entry?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (keyWeights.size() < numKeys) {
+        keyWeights.add(new ZKeyWeight(new RedisKey(arg), 1D));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!allWeightsAreOne(keyWeights)) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            keyWeights.get(i).setWeight(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        continue;
+      }
+
+      // End up here if we have more keys than weights
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (keyWeights.size() != numKeys) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    int bucket = command.getKey().getBucketId();
+    for (ZKeyWeight keyWeight : keyWeights) {
+      if (keyWeight.getKey().getBucketId() != bucket) {
+        return RedisResponse.crossSlot(ERROR_WRONG_SLOT);

Review comment:
       I saw another place in your code were you checked that all the sets are local. That probably is not needed since you check that they are all on the same bucket here.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +650,84 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {
+        newScore = getSumOfScoresForMember(sets, member, retVal);
+      } else if (aggregator.equals(ZAggregator.MAX)) {
+        newScore = getMaxScoreForMember(sets, member, retVal);
+      } else {
+        newScore = getMinScoreForMember(sets, member, retVal);
+      }
+
+      if (newScore != null) {
+        if (newScore.isNaN()) {
+          throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+        }
+        retVal.memberAdd(member, newScore);
+      }
+    }
+    return retVal;
+  }
+
+  private Double getSumOfScoresForMember(List<RedisSortedSet> sets, byte[] member,

Review comment:
       Oh you needed to use Double so you could return null. I think with the idea that we are computing the intersection on "this" (i.e. the retVal parameter goes away) and since these three methods will become one (by passing the aggregator into it) instead of it returning a Double/double it can return void and when it would have returned a non-null total it can instead check it for NAN and then just store it in this (i.e. memberAdd(member, total)).

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (keyWeights.size() < numKeys) {
+        keyWeights.add(new ZKeyWeight(new RedisKey(arg), 1D));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!allWeightsAreOne(keyWeights)) {

Review comment:
       This seems like overkill (allWeightsAreOne). When I first saw it I thought "why do all the weights have to be 1". 
    It only seems possible if they supply more than one  WEIGHTS list. For example "ZINTERSTORE dst 2 set1 set2 WEIGHTS 1 2 WEIGHTS 2 3". But isn't that illegal any way? If you are trying to make sure that WEIGHTS does not happen more than once wouldn't a simple boolean that it has been seen let you do that? AGGREGATE may have the same issue (does native redis let you repeat it?). Does native redis enforce the order (i.e. WEIGHTS first then AGGREGATE)? If so we should also.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +650,84 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {
+        newScore = getSumOfScoresForMember(sets, member, retVal);
+      } else if (aggregator.equals(ZAggregator.MAX)) {
+        newScore = getMaxScoreForMember(sets, member, retVal);
+      } else {
+        newScore = getMinScoreForMember(sets, member, retVal);
+      }
+
+      if (newScore != null) {
+        if (newScore.isNaN()) {
+          throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+        }
+        retVal.memberAdd(member, newScore);
+      }
+    }
+    return retVal;
+  }
+
+  private Double getSumOfScoresForMember(List<RedisSortedSet> sets, byte[] member,
+      RedisSortedSet retVal) {
+    double runningTotal = 0;
+    for (RedisSortedSet set : sets) {
+      if (set.members.containsKey(member)) {
+        runningTotal += set.members.get(member).score;
+      } else {
+        return null;
+      }
+    }
+    retVal.memberAdd(member, runningTotal);
+    return runningTotal;
+  }
+
+  private Double getMaxScoreForMember(List<RedisSortedSet> sets, byte[] member,
+      RedisSortedSet retVal) {
+    double runningMax = Double.MIN_VALUE;
+    for (RedisSortedSet set : sets) {
+      if (set.members.containsKey(member)) {
+        double newScore = set.members.get(member).score;
+        if (newScore > runningMax) {
+          runningMax = newScore;
+        }
+      } else {
+        return null;
+      }
+    }
+    retVal.memberAdd(member, runningMax);
+    return runningMax;
+  }
+
+  private Double getMinScoreForMember(List<RedisSortedSet> sets, byte[] member,

Review comment:
       change "Double" to "double"

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +650,84 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {
+        newScore = getSumOfScoresForMember(sets, member, retVal);
+      } else if (aggregator.equals(ZAggregator.MAX)) {
+        newScore = getMaxScoreForMember(sets, member, retVal);
+      } else {
+        newScore = getMinScoreForMember(sets, member, retVal);
+      }
+
+      if (newScore != null) {
+        if (newScore.isNaN()) {
+          throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+        }
+        retVal.memberAdd(member, newScore);
+      }
+    }
+    return retVal;
+  }
+
+  private Double getSumOfScoresForMember(List<RedisSortedSet> sets, byte[] member,
+      RedisSortedSet retVal) {
+    double runningTotal = 0;
+    for (RedisSortedSet set : sets) {
+      if (set.members.containsKey(member)) {
+        runningTotal += set.members.get(member).score;

Review comment:
       use getScore()
   Also instead of calling both containsKey and get (which does two map lookups) just call get and if its result is not null compute the runningTotal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] dschneider-pivotal commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
dschneider-pivotal commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r717843213



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +669,87 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private void computeIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {
+        newScore = getSumOfScoresForMember(sets, member);
+      } else if (aggregator.equals(ZAggregator.MAX)) {
+        newScore = getMaxScoreForMember(sets, member);
+      } else {
+        newScore = getMinScoreForMember(sets, member);
+      }
+
+      if (newScore != null) {
+        if (newScore.isNaN()) {
+          throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+        }
+        retVal.memberAdd(member, newScore);
+      }
+    }
+  }
+
+  private Double getSumOfScoresForMember(List<RedisSortedSet> sets, byte[] memberName) {

Review comment:
       are these three methods also dead code now?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +669,87 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private void computeIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {
+        newScore = getSumOfScoresForMember(sets, member);
+      } else if (aggregator.equals(ZAggregator.MAX)) {
+        newScore = getMaxScoreForMember(sets, member);
+      } else {
+        newScore = getMinScoreForMember(sets, member);
+      }
+
+      if (newScore != null) {
+        if (newScore.isNaN()) {

Review comment:
       the new "zinterstore" impl does not do this NaN check. Did you decide it was not needed?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +669,87 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private void computeIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {

Review comment:
       should this method be removed? I don't see any callers




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] dschneider-pivotal commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
dschneider-pivotal commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r716937430



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZInterStoreExecutor.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZInterStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>((int) numKeys);
+    List<Double> weights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (sourceKeys.size() < numKeys) {
+        sourceKeys.add(new RedisKey(arg));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!weights.isEmpty()) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            weights.add(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));

Review comment:
       sounds good




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r716645606



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
##########
@@ -345,6 +345,14 @@ public RedisKeyCommands getKeyCommands() {
     return (Set<DistributedMember>) (Set<?>) partitionedRegion.getRegionAdvisor().adviseDataStore();
   }
 
+  /**
+   * A means to consistently order multiple keys for locking to avoid typical deadlock situations.
+   * Note that the list of keys is sorted in place.
+   */
+  public void orderForLocking(List<RedisKey> keys) {
+    keys.sort(stripedCoordinator::compareStripes);
+  }

Review comment:
       You don't need to use this anymore since locking (and sorting) is automatically handled in the `LockingStripedCoordinator` now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] dschneider-pivotal commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
dschneider-pivotal commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r707783555



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -578,6 +625,91 @@ private int getMaxElementsToReturn(AbstractSortedSetRangeOptions<?> rangeOptions
     return result;
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList());
+
+    for (RedisSortedSet set : sets) {
+      for (OrderedSetEntry entry : set.members.values()) {
+        Double newScore;
+        if (aggregator.equals(ZAggregator.SUM)) {
+          newScore = recursivelySumScoresForMember(sets, entry.member, 0D);
+        } else if (aggregator.equals(ZAggregator.MAX)) {
+          newScore = recursivelyGetMaxScoreForMember(sets, entry.member, Double.MIN_VALUE);
+        } else {
+          newScore = recursivelyGetMinScoreForMember(sets, entry.member, Double.MAX_VALUE);
+        }
+
+        if (newScore != null) {
+          if (newScore.isNaN()) {
+            throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+          }
+          retVal.memberAdd(entry.getMember(), Coder.doubleToBytes(newScore));
+        }
+      }
+    }
+    return retVal;
+  }
+
+  private Double recursivelySumScoresForMember(List<RedisSortedSet> sets, byte[] member,
+      Double runningTotal) {
+    if (sets.isEmpty()) {
+      return runningTotal;
+    }
+
+    if (runningTotal.isNaN()) {
+      return runningTotal;
+    }
+
+    if (sets.get(0).members.containsKey(member)) {

Review comment:
       instead of doing containsKey followed by get, just do get and check for it returning null. This does one map lookup instead of two

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -578,6 +625,91 @@ private int getMaxElementsToReturn(AbstractSortedSetRangeOptions<?> rangeOptions
     return result;
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList());
+
+    for (RedisSortedSet set : sets) {
+      for (OrderedSetEntry entry : set.members.values()) {

Review comment:
       Since all you care about seeing in this for loop is each member in "members" I think you can just do this "for (byte[] member: set.members.keySet())".
   Or you could use "set.members.fastForEachKey(key->{...});"

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZInterStoreExecutor.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZInterStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>((int) numKeys);
+    List<Double> weights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (sourceKeys.size() < numKeys) {
+        sourceKeys.add(new RedisKey(arg));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!weights.isEmpty()) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            weights.add(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        continue;
+      }
+
+      // End up here if we have more keys than weights
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (sourceKeys.size() != numKeys) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    int bucket = command.getKey().getBucketId();
+    for (RedisKey key : sourceKeys) {
+      if (key.getBucketId() != bucket) {
+        return RedisResponse.crossSlot(ERROR_WRONG_SLOT);
+      }
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>();

Review comment:
       this list can be presized.
   How about just having this List and get rid of the sourceKeys list and and the weights list/array. Create the initial ZKeyWeight with just the key (its double can default to 1). Then when you parse the weights you can call setWeight on the correct ZKeyWeight.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -661,7 +793,7 @@ public static int javaImplementationOfAnsiCMemCmp(byte[] array1, byte[] array2)
       Sizeable {
     byte[] member;
     byte[] scoreBytes;
-    double score;
+    Double score;

Review comment:
       Why does this need to be an Object (Double)? The old "double" saves memory.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -578,6 +625,91 @@ private int getMaxElementsToReturn(AbstractSortedSetRangeOptions<?> rangeOptions
     return result;
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList());
+
+    for (RedisSortedSet set : sets) {
+      for (OrderedSetEntry entry : set.members.values()) {
+        Double newScore;
+        if (aggregator.equals(ZAggregator.SUM)) {
+          newScore = recursivelySumScoresForMember(sets, entry.member, 0D);
+        } else if (aggregator.equals(ZAggregator.MAX)) {
+          newScore = recursivelyGetMaxScoreForMember(sets, entry.member, Double.MIN_VALUE);
+        } else {
+          newScore = recursivelyGetMinScoreForMember(sets, entry.member, Double.MAX_VALUE);
+        }
+
+        if (newScore != null) {
+          if (newScore.isNaN()) {
+            throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+          }
+          retVal.memberAdd(entry.getMember(), Coder.doubleToBytes(newScore));
+        }
+      }
+    }
+    return retVal;
+  }
+
+  private Double recursivelySumScoresForMember(List<RedisSortedSet> sets, byte[] member,
+      Double runningTotal) {
+    if (sets.isEmpty()) {
+      return runningTotal;
+    }
+
+    if (runningTotal.isNaN()) {
+      return runningTotal;
+    }
+
+    if (sets.get(0).members.containsKey(member)) {
+      Double score = sets.get(0).members.get(member).score;

Review comment:
       use "double" here

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -578,6 +625,91 @@ private int getMaxElementsToReturn(AbstractSortedSetRangeOptions<?> rangeOptions
     return result;
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList());
+
+    for (RedisSortedSet set : sets) {
+      for (OrderedSetEntry entry : set.members.values()) {
+        Double newScore;
+        if (aggregator.equals(ZAggregator.SUM)) {
+          newScore = recursivelySumScoresForMember(sets, entry.member, 0D);
+        } else if (aggregator.equals(ZAggregator.MAX)) {
+          newScore = recursivelyGetMaxScoreForMember(sets, entry.member, Double.MIN_VALUE);
+        } else {
+          newScore = recursivelyGetMinScoreForMember(sets, entry.member, Double.MAX_VALUE);
+        }
+
+        if (newScore != null) {
+          if (newScore.isNaN()) {
+            throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+          }
+          retVal.memberAdd(entry.getMember(), Coder.doubleToBytes(newScore));
+        }
+      }
+    }
+    return retVal;
+  }
+
+  private Double recursivelySumScoresForMember(List<RedisSortedSet> sets, byte[] member,

Review comment:
       I think the recursion in all three of these methods should be replaced with a simple iteration of sets checking each one for member and computing the sum/min/max. If it finds member in each set then it can store that value on the retVal set.
   Recursion is usually slower, harder for some to understand, and could have a stack overflow.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZInterStoreExecutor.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZInterStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>((int) numKeys);
+    List<Double> weights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (sourceKeys.size() < numKeys) {
+        sourceKeys.add(new RedisKey(arg));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!weights.isEmpty()) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            weights.add(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));

Review comment:
       do you need a hasNext check here also?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -288,6 +289,52 @@ long zcount(SortedSetScoreRangeOptions rangeOptions) {
     return byteIncr;
   }
 
+  long zinterstore(RegionProvider regionProvider, RedisKey key, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisSortedSet> sets = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, keyWeight.getKey(), false);
+
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+
+      double weight = keyWeight.getWeight();
+      RedisSortedSet weightedSet = new RedisSortedSet(Collections.emptyList());
+
+      for (AbstractOrderedSetEntry entry : set.members.values()) {
+        OrderedSetEntry existingValue = members.get(entry.member);
+        if (existingValue == null) {
+          double score;
+          // Redis math and Java math are different when handling infinity. Specifically:
+          // Java: INFINITY * 0 = NaN
+          // Redis: INFINITY * 0 = 0
+          if (weight == 0) {
+            score = 0;
+          } else if (weight == 1) {
+            score = entry.getScore();
+          } else {
+            double newScore = entry.score * weight;

Review comment:
       I think this code would be easier to understand if you just set the existing "score" variable instead of introducing "newScore".

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -578,6 +625,91 @@ private int getMaxElementsToReturn(AbstractSortedSetRangeOptions<?> rangeOptions
     return result;
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList());
+
+    for (RedisSortedSet set : sets) {
+      for (OrderedSetEntry entry : set.members.values()) {
+        Double newScore;
+        if (aggregator.equals(ZAggregator.SUM)) {
+          newScore = recursivelySumScoresForMember(sets, entry.member, 0D);
+        } else if (aggregator.equals(ZAggregator.MAX)) {
+          newScore = recursivelyGetMaxScoreForMember(sets, entry.member, Double.MIN_VALUE);
+        } else {
+          newScore = recursivelyGetMinScoreForMember(sets, entry.member, Double.MAX_VALUE);
+        }
+
+        if (newScore != null) {
+          if (newScore.isNaN()) {
+            throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+          }
+          retVal.memberAdd(entry.getMember(), Coder.doubleToBytes(newScore));

Review comment:
       It seems like if you passed retVal in to recursively*ScoreForMember then it could do the retVal.memberAdd if needed. That would also allow us to get rid of this use of "Double". If you get rid of the recursion then these methods could just take a member, input sets, and a set to fill.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZInterStoreExecutor.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZInterStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>((int) numKeys);
+    List<Double> weights = new ArrayList<>((int) numKeys);

Review comment:
       could weights be made a "double[]"?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -578,6 +625,91 @@ private int getMaxElementsToReturn(AbstractSortedSetRangeOptions<?> rangeOptions
     return result;
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList());
+
+    for (RedisSortedSet set : sets) {

Review comment:
       It looks to me like if you have 10 sets then for each member that is in each set this will compute the score 10 times. I think we only need to compute it once. So do we really need to iterate over all the sets at the top level? Since we will only add members to the result that are in every one of the sets, couldn't we just iterate over the members of one of the sets at the top level? I think the best one to iterate would be the smallest one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r714291763



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -578,6 +625,91 @@ private int getMaxElementsToReturn(AbstractSortedSetRangeOptions<?> rangeOptions
     return result;
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList());
+
+    for (RedisSortedSet set : sets) {
+      for (OrderedSetEntry entry : set.members.values()) {
+        Double newScore;
+        if (aggregator.equals(ZAggregator.SUM)) {
+          newScore = recursivelySumScoresForMember(sets, entry.member, 0D);
+        } else if (aggregator.equals(ZAggregator.MAX)) {
+          newScore = recursivelyGetMaxScoreForMember(sets, entry.member, Double.MIN_VALUE);
+        } else {
+          newScore = recursivelyGetMinScoreForMember(sets, entry.member, Double.MAX_VALUE);
+        }
+
+        if (newScore != null) {
+          if (newScore.isNaN()) {
+            throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+          }
+          retVal.memberAdd(entry.getMember(), Coder.doubleToBytes(newScore));
+        }
+      }
+    }
+    return retVal;
+  }
+
+  private Double recursivelySumScoresForMember(List<RedisSortedSet> sets, byte[] member,
+      Double runningTotal) {
+    if (sets.isEmpty()) {
+      return runningTotal;
+    }
+
+    if (runningTotal.isNaN()) {
+      return runningTotal;
+    }
+
+    if (sets.get(0).members.containsKey(member)) {

Review comment:
       I'm not entirely sure what you mean. it's a get followed by a contains, not the other way around




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] DonalEvans commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
DonalEvans commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r713239683



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
##########
@@ -62,6 +62,24 @@ public long zcount(RedisKey key, SortedSetScoreRangeOptions rangeOptions) {
         () -> getRedisSortedSet(key, false).zincrby(getRegion(), key, increment, member));
   }
 
+  @Override
+  public long zinterstore(RedisKey destinationKey, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisKey> keysToLock = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight kw : keyWeights) {
+      getRegionProvider().ensureKeyIsLocal(kw.getKey());
+      keysToLock.add(kw.getKey());
+    }
+    getRegionProvider().ensureKeyIsLocal(destinationKey);
+    keysToLock.add(destinationKey);
+
+    getRegionProvider().orderForLocking(keysToLock);

Review comment:
       This block is duplicated in the `zunionstore()` method and can probably be pulled out into a "getKeysToLock()" method (or some similar name).

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -578,6 +627,91 @@ private int getMaxElementsToReturn(AbstractSortedSetRangeOptions<?> rangeOptions
     return result;
   }
 
+  private RedisSortedSet getIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList());

Review comment:
       A more efficient approach might be to start with a collection (in this case it would be a copy of the smallest set in `sets`) and then iterate it and remove members from it if they're not found in all of the other sets. This has the benefit that if we don't find the member in one of the other sets we can immediately go to the next member, and if we ever remove all the members from the collection, we know we can immediately return since the returned set will be empty. This would need some extra logic to handle Redis' behaviour when one of the input sets is empty though (i.e. the key doesn't exist), since although logically the intersection with an empty set should be an empty set, Redis treats these as no-ops and effectively ignores them in the intersection computation.
   
   We could also do the weighting at this point as part of the SUM/MAX/MIN calculation, rather than weighting every member in every set first, then calculating the intersection, since then we're not weighting any members that we know aren't going to contribute to the final outcome.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -288,6 +289,54 @@ long zcount(SortedSetScoreRangeOptions rangeOptions) {
     return byteIncr;
   }
 
+  long zinterstore(RegionProvider regionProvider, RedisKey key, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisSortedSet> sets = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, keyWeight.getKey(), false);
+
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+
+      double weight = keyWeight.getWeight();
+      RedisSortedSet weightedSet = new RedisSortedSet(Collections.emptyList());
+
+      for (AbstractOrderedSetEntry entry : set.members.values()) {
+        OrderedSetEntry existingValue = members.get(entry.member);
+        if (existingValue == null) {
+          double score;
+          // Redis math and Java math are different when handling infinity. Specifically:
+          // Java: INFINITY * 0 = NaN
+          // Redis: INFINITY * 0 = 0
+          if (weight == 0) {
+            score = 0;
+          } else if (weight == 1) {
+            score = entry.getScore();
+          } else if (Double.isInfinite(weight) && entry.score == 0D) {
+            score = 0D;
+          } else {
+            double newScore = entry.score * weight;
+            if (Double.isNaN(newScore)) {
+              throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);

Review comment:
       I think that given the Redis/Java math handling above, it's not possible for `newScore` to be NaN here, and that even if it was, it would not be correct to throw here, since Redis does not have that behaviour. Having some integration tests to handle cases where NaN might appear (both here, and when doing the SUM aggregation) would be good, to validate that we're matching Redis' behaviour in that respect. For example, the below test fails when run against Radish but passes for native Redis:
   ```
     @Test
     public void testNaNSums() {
       String member = "member";
       jedis.zadd(KEY1, Double.POSITIVE_INFINITY, member);
       jedis.zadd(KEY2, Double.NEGATIVE_INFINITY, member);
       assertThat(jedis.zinterstore(NEW_SET, KEY1, KEY2)).isOne();
       assertThat(jedis.zscore(NEW_SET, member)).isZero();
     }
   ```

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZInterStoreIntegrationTest.java
##########
@@ -0,0 +1,696 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import org.assertj.core.data.Offset;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+import redis.clients.jedis.Tuple;
+import redis.clients.jedis.ZParams;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.RedisConstants;
+
+public abstract class AbstractZInterStoreIntegrationTest implements RedisIntegrationTest {
+
+  private static final String NEW_SET = "{user1}new";
+  private static final String KEY1 = "{user1}sset1";
+  private static final String KEY2 = "{user1}sset2";
+  private static final String KEY3 = "{user1}sset3";
+
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void shouldError_givenTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZINTERSTORE, 3);
+  }
+
+  @Test
+  public void shouldError_givenWrongKeyType() {
+    final String STRING_KEY = "{user1}stringKey";
+    jedis.set(STRING_KEY, "value");
+    assertThatThrownBy(() -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "2",
+        STRING_KEY, KEY1)).hasMessage("WRONGTYPE " + RedisConstants.ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void shouldError_givenSetsCrossSlots() {
+    final String WRONG_KEY = "{user2}another";
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "2", WRONG_KEY,
+            KEY1)).hasMessage("CROSSSLOT " + RedisConstants.ERROR_WRONG_SLOT);
+  }
+
+  @Test
+  public void shouldError_givenNumkeysTooLarge() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "2", KEY1))
+            .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenNumkeysTooSmall() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1", KEY1, KEY2))
+            .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenTooManyWeights() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "WEIGHTS", "2", "3")).hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenTooFewWeights() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "2",
+            KEY1, KEY2, "WEIGHTS", "1")).hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenWeightNotANumber() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "WEIGHTS", "not-a-number"))
+                .hasMessage("ERR " + RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT);
+  }
+
+  @Test
+  public void shouldError_givenWeightsWithoutAnyValues() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "WEIGHTS")).hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenMultipleWeightKeywords() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "WEIGHT", "1.0", "WEIGHT", "2.0"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenUnknownAggregate() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "AGGREGATE", "UNKNOWN", "WEIGHTS", "1"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenAggregateKeywordWithoutValue() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "AGGREGATE")).hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenMultipleAggregates() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "WEIGHTS", "1", "AGGREGATE", "SUM", "MIN"))
+                .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenWeightOfOne_andOneRedisSortedSet() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    Set<Tuple> expectedResults = convertToTuples(scores, (ignore, value) -> value);
+    jedis.zadd(KEY1, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(1), KEY1))
+        .isEqualTo(expectedResults.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenWeightOfZero_andOneRedisSortedSet() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    Set<Tuple> expectedResults = convertToTuples(scores, (ignore, value) -> 0D);
+    jedis.zadd(KEY1, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(0), KEY1))
+        .isEqualTo(scores.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenWeightOfPositiveInfinity_andOneRedisSortedSet() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    Set<Tuple> expectedResults =
+        convertToTuples(scores, (ignore, value) -> value > 0 ? Double.POSITIVE_INFINITY : value);
+    jedis.zadd(KEY1, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(Double.POSITIVE_INFINITY),
+        KEY1)).isEqualTo(scores.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenWeightOfNegativeInfinity_andOneRedisSortedSet() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    jedis.zadd(KEY1, scores);
+
+    Set<Tuple> expectedResults = new LinkedHashSet<>();
+    expectedResults.add(new Tuple("player1", Double.POSITIVE_INFINITY));
+    expectedResults.add(new Tuple("player2", 0D));
+    expectedResults.add(new Tuple("player3", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player4", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player5", Double.NEGATIVE_INFINITY));
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(Double.NEGATIVE_INFINITY),
+        KEY1)).isEqualTo(scores.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenWeightOfN_andOneRedisSortedSet() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    jedis.zadd(KEY1, scores);
+
+    double multiplier = 2.71D;
+
+    Set<Tuple> expectedResults = new LinkedHashSet<>();
+    expectedResults.add(new Tuple("player1", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player2", 0D));
+    expectedResults.add(new Tuple("player3", multiplier));
+    expectedResults.add(new Tuple("player4", Double.POSITIVE_INFINITY));
+    expectedResults.add(new Tuple("player5", 3.2D * multiplier));
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(multiplier), KEY1))
+        .isEqualTo(scores.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenMultipleRedisSortedSets() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    Set<Tuple> expectedResults = new LinkedHashSet<>();
+    expectedResults.add(new Tuple("player1", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player2", 0D));
+    expectedResults.add(new Tuple("player3", 2D));
+    expectedResults.add(new Tuple("player4", Double.POSITIVE_INFINITY));
+    expectedResults.add(new Tuple("player5", 3.2D * 2));
+
+    jedis.zadd(KEY1, scores);
+    jedis.zadd(KEY2, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams(), KEY1, KEY2))
+        .isEqualTo(expectedResults.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenTwoRedisSortedSets_withDifferentWeights() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    Set<Tuple> expectedResults = new LinkedHashSet<>();
+    expectedResults.add(new Tuple("player1", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player2", 0D));
+    expectedResults.add(new Tuple("player3", 3D));
+    expectedResults.add(new Tuple("player4", Double.POSITIVE_INFINITY));
+    expectedResults.add(new Tuple("player5", 3.2D * 3));
+
+    jedis.zadd(KEY1, scores);
+    jedis.zadd(KEY2, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(1, 2), KEY1, KEY2))
+        .isEqualTo(expectedResults.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenMultipleIdenticalRedisSortedSets_withDifferentPositiveWeights() {
+    Map<String, Double> scores = buildMapOfMembersAndScores();
+    Set<Tuple> expectedResults = new LinkedHashSet<>();
+    expectedResults.add(new Tuple("player1", Double.NEGATIVE_INFINITY));
+    expectedResults.add(new Tuple("player2", 0D));
+    expectedResults.add(new Tuple("player3", 4.5D));
+    expectedResults.add(new Tuple("player4", Double.POSITIVE_INFINITY));
+    expectedResults.add(new Tuple("player5", 3.2D * 4.5D));
+
+    jedis.zadd(KEY1, scores);
+    jedis.zadd(KEY2, scores);
+    jedis.zadd(KEY3, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().weights(1D, 2D, 1.5D), KEY1, KEY2, KEY3))
+        .isEqualTo(expectedResults.size());
+
+    Set<Tuple> actualResults = jedis.zrangeWithScores(NEW_SET, 0, scores.size());
+    assertThatActualScoresAreVeryCloseToExpectedScores(expectedResults, actualResults);
+  }
+
+  @Test
+  public void shouldStoreIntersection_givenOneSetDoesNotExist() {
+    Map<String, Double> scores = buildMapOfMembersAndScores(1, 10);
+    Set<Tuple> expectedResults = convertToTuples(scores, (i, x) -> x);
+    jedis.zadd(KEY1, scores);
+
+    assertThat(jedis.zinterstore(NEW_SET, KEY1, KEY2)).isEqualTo(scores.size());
+
+    Set<Tuple> results = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expectedResults, results);
+  }
+
+  @Test
+  public void shouldStoreNothingAtDestinationKey_givenTwoNonIntersectingSets() {
+    Map<String, Double> scores = buildMapOfMembersAndScores(1, 5);
+    Map<String, Double> nonIntersectionScores = buildMapOfMembersAndScores(6, 10);
+    jedis.zadd(KEY1, scores);
+    jedis.zadd(KEY2, nonIntersectionScores);
+
+    assertThat(jedis.zinterstore(NEW_SET, KEY1, KEY2)).isZero();
+
+    assertThat(jedis.zrangeWithScores(NEW_SET, 0, 10)).isEmpty();
+  }
+
+  @Test
+  public void shouldStoreSumOfIntersection_givenThreePartiallyOverlappingSets() {
+    Map<String, Double> scores1 = buildMapOfMembersAndScores(1, 10);
+    Map<String, Double> scores2 = buildMapOfMembersAndScores(6, 13);
+    Map<String, Double> scores3 = buildMapOfMembersAndScores(4, 11);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.SUM),
+        KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(tupleSumOfScores("player" + i, scores1, scores2, scores3));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreMaxOfIntersection_givenThreePartiallyOverlappingSets() {
+    Map<String, Double> scores1 = buildMapOfMembersAndScores(1, 10);
+    Map<String, Double> scores2 = buildMapOfMembersAndScores(6, 13);
+    Map<String, Double> scores3 = buildMapOfMembersAndScores(4, 11);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.MAX),
+        KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(tupleMaxOfScores("player" + i, scores1, scores2, scores3));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreMinOfIntersection_givenThreePartiallyOverlappingSets() {
+    Map<String, Double> scores1 = buildMapOfMembersAndScores(1, 10);
+    Map<String, Double> scores2 = buildMapOfMembersAndScores(6, 13);
+    Map<String, Double> scores3 = buildMapOfMembersAndScores(4, 11);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    assertThat(jedis.zinterstore(NEW_SET, new ZParams().aggregate(ZParams.Aggregate.MIN),
+        KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(tupleMinOfScores("player" + i, scores1, scores2, scores3));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreSumOfIntersection_givenThreePartiallyOverlappingSets_andWeights() {
+    double weight1 = 0D;
+    double weight2 = 42D;
+    double weight3 = -7.3D;
+
+    Map<String, Double> scores1 = buildMapOfMembersAndScores(1, 10);
+    Map<String, Double> scores2 = buildMapOfMembersAndScores(6, 13);
+    Map<String, Double> scores3 = buildMapOfMembersAndScores(4, 11);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    ZParams zParams = new ZParams().aggregate(ZParams.Aggregate.SUM)
+        .weights(weight1, weight2, weight3);
+
+    assertThat(jedis.zinterstore(NEW_SET, zParams, KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(tupleSumOfScoresWithWeights("player" + i, scores1, scores2, scores3, weight1,
+          weight2, weight3));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreMaxOfIntersection_givenThreePartiallyOverlappingSets_andWeights() {
+    double weight1 = 0D;
+    double weight2 = 42D;
+    double weight3 = -7.3D;
+
+    Map<String, Double> scores1 = buildMapOfMembersAndScores(1, 10);
+    Map<String, Double> scores2 = buildMapOfMembersAndScores(6, 13);
+    Map<String, Double> scores3 = buildMapOfMembersAndScores(4, 11);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    ZParams zParams = new ZParams().aggregate(ZParams.Aggregate.MAX)
+        .weights(weight1, weight2, weight3);
+
+    assertThat(jedis.zinterstore(NEW_SET, zParams, KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(tupleMaxOfScoresWithWeights("player" + i, scores1, scores2, scores3, weight1,
+          weight2, weight3));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreMinOfIntersection_givenThreePartiallyOverlappingSets_andWeights() {
+    double weight1 = 0D;
+    double weight2 = 42D;
+    double weight3 = -7.3D;
+
+    Map<String, Double> scores1 = buildMapOfMembersAndScores(1, 10);
+    Map<String, Double> scores2 = buildMapOfMembersAndScores(6, 13);
+    Map<String, Double> scores3 = buildMapOfMembersAndScores(4, 11);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    ZParams zParams = new ZParams().aggregate(ZParams.Aggregate.MIN)
+        .weights(weight1, weight2, weight3);
+
+    assertThat(jedis.zinterstore(NEW_SET, zParams, KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(tupleMinOfScoresWithWeights("player" + i, scores1, scores2, scores3, weight1,
+          weight2, weight3));
+    }
+
+    Set<Tuple> actual = jedis.zrangeWithScores(NEW_SET, 0, 10);
+
+    assertThatActualScoresAreVeryCloseToExpectedScores(expected, actual);
+  }
+
+  @Test
+  public void shouldStoreMaxOfIntersection_givenThreePartiallyOverlappingSetsWithIdenticalScores() {
+    double score = 3.141592;
+    Map<String, Double> scores1 = buildMapOfMembersAndIdenticalScores(1, 10, score);
+    Map<String, Double> scores2 = buildMapOfMembersAndIdenticalScores(6, 13, score);
+    Map<String, Double> scores3 = buildMapOfMembersAndIdenticalScores(4, 11, score);
+
+    jedis.zadd(KEY1, scores1);
+    jedis.zadd(KEY2, scores2);
+    jedis.zadd(KEY3, scores3);
+
+    ZParams zParams = new ZParams().aggregate(ZParams.Aggregate.MAX);
+
+    assertThat(jedis.zinterstore(NEW_SET, zParams, KEY1, KEY2, KEY3)).isEqualTo(5);
+
+    Set<Tuple> expected = new HashSet<>();
+    for (int i = 6; i <= 10; i++) {
+      expected.add(tupleMaxOfScores("player" + i, scores1, scores2, scores3));

Review comment:
       This can be simplified to just
   ```
   expected.add(new Tuple("player" + i, score));
   ```

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZInterStoreExecutor.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZInterStoreExecutor extends AbstractExecutor {

Review comment:
       This class and the `ZUnionStoreExecutor` class should be modified to extend a parent class to prevent duplication of code, since the only difference between the files is what method is called on line 121. This would make implementing other similar sorted set comments in the future much simpler too, as they could also extend the parent class and just override the single method.

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZInterStoreIntegrationTest.java
##########
@@ -0,0 +1,696 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+import org.assertj.core.data.Offset;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+import redis.clients.jedis.Tuple;
+import redis.clients.jedis.ZParams;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.RedisConstants;
+
+public abstract class AbstractZInterStoreIntegrationTest implements RedisIntegrationTest {
+
+  private static final String NEW_SET = "{user1}new";
+  private static final String KEY1 = "{user1}sset1";
+  private static final String KEY2 = "{user1}sset2";
+  private static final String KEY3 = "{user1}sset3";
+
+  private JedisCluster jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void shouldError_givenTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZINTERSTORE, 3);
+  }
+
+  @Test
+  public void shouldError_givenWrongKeyType() {
+    final String STRING_KEY = "{user1}stringKey";
+    jedis.set(STRING_KEY, "value");
+    assertThatThrownBy(() -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "2",
+        STRING_KEY, KEY1)).hasMessage("WRONGTYPE " + RedisConstants.ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void shouldError_givenSetsCrossSlots() {
+    final String WRONG_KEY = "{user2}another";
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "2", WRONG_KEY,
+            KEY1)).hasMessage("CROSSSLOT " + RedisConstants.ERROR_WRONG_SLOT);
+  }
+
+  @Test
+  public void shouldError_givenNumkeysTooLarge() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "2", KEY1))
+            .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenNumkeysTooSmall() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1", KEY1, KEY2))
+            .hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenTooManyWeights() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "1",
+            KEY1, "WEIGHTS", "2", "3")).hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenTooFewWeights() {
+    assertThatThrownBy(
+        () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "2",
+            KEY1, KEY2, "WEIGHTS", "1")).hasMessage("ERR " + RedisConstants.ERROR_SYNTAX);
+  }
+
+  @Test
+  public void shouldError_givenWeightNotANumber() {

Review comment:
       Could this be "givenWeightNotAFloat" to avoid confusion with `NaN`, which while not a number is technically a float value?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] DonalEvans commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
DonalEvans commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r715154104



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());

Review comment:
       To prevent possible integer overflow when this value is later cast to an `int`; `numKeys` should be declared as an `int` and this line changed to `numKeys = narrowLongToInt(bytesToLong(argIterator.next()));`

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -288,6 +289,54 @@ long zcount(SortedSetScoreRangeOptions rangeOptions) {
     return byteIncr;
   }
 
+  long zinterstore(RegionProvider regionProvider, RedisKey key, List<ZKeyWeight> keyWeights,
+      ZAggregator aggregator) {
+    List<RedisSortedSet> sets = new ArrayList<>(keyWeights.size());
+    for (ZKeyWeight keyWeight : keyWeights) {
+      RedisSortedSet set =
+          regionProvider.getTypedRedisData(REDIS_SORTED_SET, keyWeight.getKey(), false);
+
+      if (set == NULL_REDIS_SORTED_SET) {
+        continue;
+      }
+
+      double weight = keyWeight.getWeight();
+      RedisSortedSet weightedSet = new RedisSortedSet(Collections.emptyList());
+
+      for (AbstractOrderedSetEntry entry : set.members.values()) {
+        OrderedSetEntry existingValue = members.get(entry.member);
+        if (existingValue == null) {
+          double score;
+          // Redis math and Java math are different when handling infinity. Specifically:
+          // Java: INFINITY * 0 = NaN
+          // Redis: INFINITY * 0 = 0
+          if (weight == 0) {
+            score = 0;
+          } else if (weight == 1) {
+            score = entry.getScore();
+          } else if (Double.isInfinite(weight) && entry.score == 0D) {
+            score = 0D;
+          } else {
+            double newScore = entry.score * weight;
+            if (Double.isNaN(newScore)) {
+              throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);

Review comment:
       The test that I included here is still failing when run against Geode Redis and passing when run against native Redis. I think that what's needed is a check in the `getSumOfScoresForMember()` method to see if we're about to add positive infinity to negative infinity and if so, not return NaN, but return 0, so that we can match Redis' behaviour. We're already avoiding doing Java math if we know the result would be different from Redis (see the comments just above here about `INFINITY * 0`) so adding in this check isn't deviating from what we've already decided we want to do.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (keyWeights.size() < numKeys) {
+        keyWeights.add(new ZKeyWeight(new RedisKey(arg), 1D));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!allWeightsAreOne(keyWeights)) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }

Review comment:
       Rather than checking the entire contents of `keyWeights` here (which could potentially be a very long list) it might be better to have a boolean flag called "weightsParsed" or something, which we set to true the first time we parse a `WEIGHTS` argument and return an error if we check it and it's already true.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (keyWeights.size() < numKeys) {
+        keyWeights.add(new ZKeyWeight(new RedisKey(arg), 1D));
+        continue;
+      }

Review comment:
       This seems like it could be broken up into two separate loops; one where we parse all the keys (since we know how many there should be), and one where we parse the weights and any aggregate arguments that might be present. That would avoid us having to check the size of `keyWeights` for every iteration after we're done parsing the keys, and would break the method up a bit and make it easier to read. It might be useful to use a `ListIterator` to help with this, since they allow you to begin iterating from a certain index of a List (in this case, the index of the first element after all of the keys): `ListIterator<byte[]> iterator = commandElements.listIterator(numKeys + 2);`

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
##########
@@ -164,6 +174,17 @@ public long zrevrank(RedisKey key, byte[] member) {
   @Override
   public long zunionstore(RedisKey destinationKey, List<ZKeyWeight> keyWeights,
       ZAggregator aggregator) {
+    List<RedisKey> keysToLock = lockKeys(destinationKey, keyWeights);
+
+    return stripedExecute(destinationKey, keysToLock,
+        () -> new RedisSortedSet(Collections.emptyList(), new double[] {}).zunionstore(
+            getRegionProvider(),
+            destinationKey, keyWeights, aggregator));
+  }
+
+
+  /************* Helper Methods *************/
+  private List<RedisKey> lockKeys(RedisKey destinationKey, List<ZKeyWeight> keyWeights) {

Review comment:
       This method name is potentially misleading, since it doesn't actually lock the keys, just get a list of keys in the appropriate order for locking by the `stripedExecute()` method.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (keyWeights.size() < numKeys) {
+        keyWeights.add(new ZKeyWeight(new RedisKey(arg), 1D));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!allWeightsAreOne(keyWeights)) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            keyWeights.get(i).setWeight(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        continue;
+      }
+
+      // End up here if we have more keys than weights
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (keyWeights.size() != numKeys) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    int bucket = command.getKey().getBucketId();

Review comment:
       This check should be on the slot ID of the key, not the bucket ID, since each bucket has multiple slots in it. As this is, you could provide keys with non-matching slots and the command would succeed if they happened to be in the same bucket, which would be a significant difference from native Redis' behaviour.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {

Review comment:
       We should also check that `numKeys` is positive, and if not, return the same error as native Redis: "ERR at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE"

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (keyWeights.size() < numKeys) {
+        keyWeights.add(new ZKeyWeight(new RedisKey(arg), 1D));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!allWeightsAreOne(keyWeights)) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            keyWeights.get(i).setWeight(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        continue;
+      }
+
+      // End up here if we have more keys than weights

Review comment:
       This comment is incorrect, since the check on line 76 ensures that we return an error if we have fewer weights than keys. Rather, this return is for when we've finished parsing keys and encounter an option other than `WEIGHTS` or `AGGREGATE`

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (keyWeights.size() < numKeys) {
+        keyWeights.add(new ZKeyWeight(new RedisKey(arg), 1D));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!allWeightsAreOne(keyWeights)) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            keyWeights.get(i).setWeight(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        continue;
+      }
+
+      // End up here if we have more keys than weights
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (keyWeights.size() != numKeys) {

Review comment:
       This check might be unnecessary if the check on line 56 is modified slightly to check if `numKeys` is greater than `commandElements.size() - 2` rather than being greater than `commandElements.size()`. What this line is really checking is that we parsed `numKeys` elements before running out of things to parse. By checking that (excluding the command name and the destination key, hence the -2) there are at minimum enough elements in `commandElements` for us to parse `numKeys` worth, we can know that if we get to this line, then `keyWeights.size()` is guaranteed to be equal to `numKeys`, because we only ever add to `keyWeights()` on line 67, and we stop adding there as soon as we've added `numKeys` elements.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] DonalEvans commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
DonalEvans commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r717812025



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +669,87 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private void computeIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {

Review comment:
       This method and the ones below it are no longer used and can be removed.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -676,8 +819,8 @@ public static int javaImplementationOfAnsiCMemCmp(byte[] array1, byte[] array2)
   public abstract static class AbstractOrderedSetEntry
       implements Comparable<AbstractOrderedSetEntry>,
       Sizeable {
-    byte[] member;
-    double score;
+    protected byte[] member;
+    protected double score = 0D;

Review comment:
       The visibility of these fields can be put back to being packing-private, as they don't need to be accessed by anything outside of this package. There's even an argument for making them private and having them only be accessed via getters and set via package-private setters.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -84,6 +85,10 @@ public int getSizeInBytes() {
     }
   }
 
+  RedisSortedSet(int size) {

Review comment:
       This is currently only used in the `RedisSortedSetCommandsFunctionExecutor.zunionstore()` method. Other places that could use this constructor to create an empty `RedisSortedSet` without needing to allocate a `double[]` are `RedisSortedSetCommandsFunctionExecutor.zinterstore()` and the `NullRedisSortedSet` constructor.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
##########
@@ -164,7 +174,7 @@ public long zrevrank(RedisKey key, byte[] member) {
   @Override
   public long zunionstore(RedisKey destinationKey, List<ZKeyWeight> keyWeights,
       ZAggregator aggregator) {
-    List<RedisKey> keysToLock = new ArrayList<>(keyWeights.size());
+    List<RedisKey> keysToLock = getKeysToLock(destinationKey, keyWeights);

Review comment:
       Since we're calling `getKeysToLock()` here, we don't need to iterate over the `ZKeyWeight` list and repopulate the `keysToLock` list below.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_KEY_REQUIRED;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.narrowLongToInt;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+    ListIterator<byte[]> argIterator = commandElements.listIterator();
+    RedisResponse syntaxErrorResponse = RedisResponse.error(ERROR_SYNTAX);
+
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    // get the number of keys
+    int numKeys;
+    try {
+      numKeys = narrowLongToInt(Coder.bytesToLong(argIterator.next()));
+      if (numKeys > commandElements.size() - 2) {
+        return syntaxErrorResponse;
+      } else if (numKeys <= 0) {
+        return RedisResponse.error(ERROR_KEY_REQUIRED);
+      }
+    } catch (NumberFormatException ex) {
+      return syntaxErrorResponse;
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>(numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+    byte[] argument;
+
+    // get all the keys
+    for (int i = 0; i < numKeys; i++) {
+      if (!argIterator.hasNext()) {
+        return syntaxErrorResponse;
+      }
+      argument = argIterator.next();
+      if (Arrays.equals(toUpperCaseBytes(argument), bWEIGHTS)
+          || Arrays.equals(toUpperCaseBytes(argument), bAGGREGATE)) {
+        return syntaxErrorResponse;
+      }
+      keyWeights.add(new ZKeyWeight(new RedisKey(argument), 1D));
+    }
+
+    while (argIterator.hasNext()) {
+      argument = argIterator.next();
+      // found AGGREGATE keyword; parse aggregate
+      if (Arrays.equals(toUpperCaseBytes(argument), bAGGREGATE)) {
+        if (!argIterator.hasNext()) {
+          return syntaxErrorResponse;
+        }
+        argument = argIterator.next();
+        if (Arrays.equals(toUpperCaseBytes(argument), bWEIGHTS)) {
+          return syntaxErrorResponse; // there must be an aggregate between 'AGGREGATE' & 'WEIGHTS'
+        }
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(toUpperCaseBytes(argument)));
+        } catch (IllegalArgumentException e) {
+          return syntaxErrorResponse;
+        }
+        // found WEIGHTS keyword; parse weights
+      } else if (Arrays.equals(toUpperCaseBytes(argument), bWEIGHTS)) {
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return syntaxErrorResponse;
+          }
+          argument = argIterator.next();
+          if (Arrays.equals(toUpperCaseBytes(argument), bAGGREGATE)) {
+            return syntaxErrorResponse; // there must be # weights between 'WEIGHTS' & 'AGGREGATE'
+          }

Review comment:
       This behaviour doesn't match native Redis. The below test fails with Radish but passes with native Redis:
   ```
     @Test
     public void shouldReturnWeightNotAValidFloat_givenWeightsFollowedByCorrectNumberOfArgumentsIncludingAggregate() {
       assertThatThrownBy(
           () -> jedis.sendCommand(NEW_SET, Protocol.Command.ZINTERSTORE, NEW_SET, "3",
               KEY1, KEY2, KEY3, "WEIGHTS", "1", "AGGREGATE", "SUM"))
                   .hasMessage("ERR " + RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT);
     }
   ```
   I think that this check is not needed, because we only care if there are enough weights specified (if there aren't, `argIterator.hasNext()` returns false and we return a syntax error) and that those weights are valid floats (if they're not, we get a `NumberFormatException` below and return `ERROR_WEIGHT_NOT_A_FLOAT`).

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_KEY_REQUIRED;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.narrowLongToInt;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+    ListIterator<byte[]> argIterator = commandElements.listIterator();
+    RedisResponse syntaxErrorResponse = RedisResponse.error(ERROR_SYNTAX);
+
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    // get the number of keys
+    int numKeys;
+    try {
+      numKeys = narrowLongToInt(Coder.bytesToLong(argIterator.next()));
+      if (numKeys > commandElements.size() - 2) {
+        return syntaxErrorResponse;
+      } else if (numKeys <= 0) {
+        return RedisResponse.error(ERROR_KEY_REQUIRED);
+      }
+    } catch (NumberFormatException ex) {
+      return syntaxErrorResponse;
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>(numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+    byte[] argument;
+
+    // get all the keys
+    for (int i = 0; i < numKeys; i++) {
+      if (!argIterator.hasNext()) {
+        return syntaxErrorResponse;
+      }
+      argument = argIterator.next();
+      if (Arrays.equals(toUpperCaseBytes(argument), bWEIGHTS)
+          || Arrays.equals(toUpperCaseBytes(argument), bAGGREGATE)) {
+        return syntaxErrorResponse;
+      }

Review comment:
       This behaviour is incorrect. "WEIGHTS" and "AGGREGATE" are both valid Redis key names. The below test fails with our implementation, but passes on native Redis:
   ```
   @Test
     public void shouldNotReturnError_givenKeysNamedWeightsOrAggregate() {
       String weights = "WEIGHTS";
       jedis.zadd(weights, 1, "member");
       String aggregate = "AGGREGATE";
       jedis.zadd(aggregate, 1, "member");
       jedis.zinterstore(weights, weights, weights);
       jedis.zinterstore(aggregate, aggregate, aggregate);
     }
   ```

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (keyWeights.size() < numKeys) {
+        keyWeights.add(new ZKeyWeight(new RedisKey(arg), 1D));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!allWeightsAreOne(keyWeights)) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            keyWeights.get(i).setWeight(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        continue;
+      }
+
+      // End up here if we have more keys than weights
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (keyWeights.size() != numKeys) {

Review comment:
       With the current state of the code, when I comment out this check, `AbstractZInterStoreIntegrationTest.shouldError_givenNumKeysTooLarge()` passes. If we specify too many keys, we catch that on line 54 and return there.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_KEY_REQUIRED;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.narrowLongToInt;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ListIterator;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+    ListIterator<byte[]> argIterator = commandElements.listIterator();
+    RedisResponse syntaxErrorResponse = RedisResponse.error(ERROR_SYNTAX);
+
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    // get the number of keys
+    int numKeys;
+    try {
+      numKeys = narrowLongToInt(Coder.bytesToLong(argIterator.next()));
+      if (numKeys > commandElements.size() - 2) {
+        return syntaxErrorResponse;
+      } else if (numKeys <= 0) {
+        return RedisResponse.error(ERROR_KEY_REQUIRED);
+      }
+    } catch (NumberFormatException ex) {
+      return syntaxErrorResponse;
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>(numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+    byte[] argument;
+
+    // get all the keys
+    for (int i = 0; i < numKeys; i++) {
+      if (!argIterator.hasNext()) {
+        return syntaxErrorResponse;
+      }
+      argument = argIterator.next();
+      if (Arrays.equals(toUpperCaseBytes(argument), bWEIGHTS)
+          || Arrays.equals(toUpperCaseBytes(argument), bAGGREGATE)) {
+        return syntaxErrorResponse;
+      }
+      keyWeights.add(new ZKeyWeight(new RedisKey(argument), 1D));
+    }
+
+    while (argIterator.hasNext()) {
+      argument = argIterator.next();
+      // found AGGREGATE keyword; parse aggregate
+      if (Arrays.equals(toUpperCaseBytes(argument), bAGGREGATE)) {
+        if (!argIterator.hasNext()) {
+          return syntaxErrorResponse;
+        }
+        argument = argIterator.next();
+        if (Arrays.equals(toUpperCaseBytes(argument), bWEIGHTS)) {
+          return syntaxErrorResponse; // there must be an aggregate between 'AGGREGATE' & 'WEIGHTS'
+        }

Review comment:
       This check is unnecessary, as the below one, where we try to set the `ZAggregator`, will return the same response if the next argument is not a valid aggregator type.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] dschneider-pivotal commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
dschneider-pivotal commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r716934456



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +669,87 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private void computeIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {
+        newScore = getSumOfScoresForMember(sets, member);
+      } else if (aggregator.equals(ZAggregator.MAX)) {
+        newScore = getMaxScoreForMember(sets, member);
+      } else {
+        newScore = getMinScoreForMember(sets, member);
+      }
+
+      if (newScore != null) {
+        if (newScore.isNaN()) {
+          throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+        }
+        retVal.memberAdd(member, newScore);
+      }
+    }
+  }
+
+  private Double getSumOfScoresForMember(List<RedisSortedSet> sets, byte[] memberName) {
+    OrderedSetEntry member;
+    double runningTotal = 0;
+
+    for (RedisSortedSet set : sets) {
+      if ((member = set.members.get(memberName)) != null) {
+        if (Double.isInfinite(runningTotal) && member.getScore() == -runningTotal) {
+          runningTotal = 0;
+        } else {
+          runningTotal += member.getScore();
+        }
+      } else {
+        return null;
+      }
+    }
+
+    this.memberAdd(memberName, runningTotal);
+    return runningTotal;
+  }
+
+  private Double getMaxScoreForMember(List<RedisSortedSet> sets, byte[] member) {
+    double runningMax = Double.MIN_VALUE;
+    for (RedisSortedSet set : sets) {
+      if (set.members.containsKey(member)) {

Review comment:
       You had a question about an old comment I made about containsKey. In this code instead of calling members.containsKey on line 724 followed by members.get on line 725 you could just call members.get and test for null. That way you do one map lookup instead of two.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZInterStoreExecutor.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZInterStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>((int) numKeys);
+    List<Double> weights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (sourceKeys.size() < numKeys) {
+        sourceKeys.add(new RedisKey(arg));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!weights.isEmpty()) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            weights.add(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));

Review comment:
       sounds good

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (keyWeights.size() < numKeys) {
+        keyWeights.add(new ZKeyWeight(new RedisKey(arg), 1D));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!allWeightsAreOne(keyWeights)) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            keyWeights.get(i).setWeight(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        continue;
+      }
+
+      // End up here if we have more keys than weights
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (keyWeights.size() != numKeys) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    int bucket = command.getKey().getBucketId();
+    for (ZKeyWeight keyWeight : keyWeights) {
+      if (keyWeight.getKey().getBucketId() != bucket) {
+        return RedisResponse.crossSlot(ERROR_WRONG_SLOT);

Review comment:
       I'm not sure but I also saw a comment from Jens that seemed related to this. He would be a good one to ask about this too

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +669,87 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private void computeIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {
+        newScore = getSumOfScoresForMember(sets, member);
+      } else if (aggregator.equals(ZAggregator.MAX)) {
+        newScore = getMaxScoreForMember(sets, member);
+      } else {
+        newScore = getMinScoreForMember(sets, member);
+      }
+
+      if (newScore != null) {
+        if (newScore.isNaN()) {
+          throw new ArithmeticException(ERROR_OPERATION_PRODUCED_NAN);
+        }
+        retVal.memberAdd(member, newScore);
+      }
+    }
+  }
+
+  private Double getSumOfScoresForMember(List<RedisSortedSet> sets, byte[] memberName) {

Review comment:
       are these three methods also dead code now?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +669,87 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private void computeIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {
+    RedisSortedSet retVal = new RedisSortedSet(Collections.emptyList(), new double[] {});
+    RedisSortedSet smallestSet = sets.get(0);
+
+    for (RedisSortedSet set : sets) {
+      if (set.getSortedSetSize() < smallestSet.getSortedSetSize()) {
+        smallestSet = set;
+      }
+    }
+
+    for (byte[] member : smallestSet.members.keySet()) {
+      Double newScore;
+      if (aggregator.equals(ZAggregator.SUM)) {
+        newScore = getSumOfScoresForMember(sets, member);
+      } else if (aggregator.equals(ZAggregator.MAX)) {
+        newScore = getMaxScoreForMember(sets, member);
+      } else {
+        newScore = getMinScoreForMember(sets, member);
+      }
+
+      if (newScore != null) {
+        if (newScore.isNaN()) {

Review comment:
       the new "zinterstore" impl does not do this NaN check. Did you decide it was not needed?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -607,6 +669,87 @@ private void addIfMatching(GlobPattern matchPattern, List<byte[]> resultList, by
     }
   }
 
+  private void computeIntersection(List<RedisSortedSet> sets, ZAggregator aggregator) {

Review comment:
       should this method be removed? I don't see any callers




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r715196412



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (keyWeights.size() < numKeys) {
+        keyWeights.add(new ZKeyWeight(new RedisKey(arg), 1D));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!allWeightsAreOne(keyWeights)) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            keyWeights.get(i).setWeight(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        continue;
+      }
+
+      // End up here if we have more keys than weights
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (keyWeights.size() != numKeys) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    int bucket = command.getKey().getBucketId();
+    for (ZKeyWeight keyWeight : keyWeights) {
+      if (keyWeight.getKey().getBucketId() != bucket) {
+        return RedisResponse.crossSlot(ERROR_WRONG_SLOT);

Review comment:
       do you mean this? https://github.com/apache/geode/pull/6861/files#diff-390c7c2da5112f714107592eaa475143c72fd69d19b1d1d3d8bb2ac25ed0f3d5R190




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r714326220



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZInterStoreExecutor.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZInterStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<RedisKey> sourceKeys = new ArrayList<>((int) numKeys);
+    List<Double> weights = new ArrayList<>((int) numKeys);

Review comment:
       it cannot. the weights must be able to be null, and we need to be able to check if the list is empty, and since it's pre-sized the length was always `numKeys`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer merged pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer merged pull request #6861:
URL: https://github.com/apache/geode/pull/6861


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] dschneider-pivotal commented on a change in pull request #6861: GEODE-9516: add ZINTERSTORE command

Posted by GitBox <gi...@apache.org>.
dschneider-pivotal commented on a change in pull request #6861:
URL: https://github.com/apache/geode/pull/6861#discussion_r716955729



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZStoreExecutor.java
##########
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WEIGHT_NOT_A_FLOAT;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_SLOT;
+import static org.apache.geode.redis.internal.netty.Coder.toUpperCaseBytes;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bAGGREGATE;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bWEIGHTS;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public abstract class ZStoreExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    List<byte[]> commandElements = command.getProcessedCommand();
+
+    Iterator<byte[]> argIterator = commandElements.iterator();
+    // Skip command and destination key
+    argIterator.next();
+    argIterator.next();
+
+    long numKeys;
+    try {
+      numKeys = Coder.bytesToLong(argIterator.next());
+    } catch (NumberFormatException ex) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    // Rough validation so that we can use numKeys to initialize the array sizes below.
+    if (numKeys > commandElements.size()) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    List<ZKeyWeight> keyWeights = new ArrayList<>((int) numKeys);
+    ZAggregator aggregator = ZAggregator.SUM;
+
+    while (argIterator.hasNext()) {
+      byte[] arg = argIterator.next();
+
+      if (keyWeights.size() < numKeys) {
+        keyWeights.add(new ZKeyWeight(new RedisKey(arg), 1D));
+        continue;
+      }
+
+      arg = toUpperCaseBytes(arg);
+      if (Arrays.equals(arg, bWEIGHTS)) {
+        if (!allWeightsAreOne(keyWeights)) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        for (int i = 0; i < numKeys; i++) {
+          if (!argIterator.hasNext()) {
+            return RedisResponse.error(ERROR_SYNTAX);
+          }
+          try {
+            keyWeights.get(i).setWeight(Coder.bytesToDouble(argIterator.next()));
+          } catch (NumberFormatException nex) {
+            return RedisResponse.error(ERROR_WEIGHT_NOT_A_FLOAT);
+          }
+        }
+        continue;
+      }
+
+      if (Arrays.equals(arg, bAGGREGATE)) {
+        try {
+          aggregator = ZAggregator.valueOf(Coder.bytesToString(argIterator.next()));
+        } catch (IllegalArgumentException | NoSuchElementException e) {
+          return RedisResponse.error(ERROR_SYNTAX);
+        }
+        continue;
+      }
+
+      // End up here if we have more keys than weights
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    if (keyWeights.size() != numKeys) {
+      return RedisResponse.error(ERROR_SYNTAX);
+    }
+
+    int bucket = command.getKey().getBucketId();
+    for (ZKeyWeight keyWeight : keyWeights) {
+      if (keyWeight.getKey().getBucketId() != bucket) {
+        return RedisResponse.crossSlot(ERROR_WRONG_SLOT);

Review comment:
       I'm not sure but I also saw a comment from Jens that seemed related to this. He would be a good one to ask about this too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org