You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2021/05/27 00:25:48 UTC

[GitHub] [geode] pivotal-eshu opened a new pull request #6524: GEODE-9318: Implement ZREM command.

pivotal-eshu opened a new pull request #6524:
URL: https://github.com/apache/geode/pull/6524


   Thank you for submitting a contribution to Apache Geode.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message?
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `develop`)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   - [ ] Does `gradlew build` run cleanly?
   
   - [ ] Have you written or updated unit tests to verify your changes?
   
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   
   ### Note:
   Please ensure that once the PR is submitted, check Concourse for build issues and
   submit an update to your PR as soon as possible. If you need help, please send an
   email to dev@geode.apache.org.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-eshu closed pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
pivotal-eshu closed pull request #6524:
URL: https://github.com/apache/geode/pull/6524


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-eshu commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
pivotal-eshu commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r640827147



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();

Review comment:
       The issue I see with the above methods is that each thread loop on the same member in order. The second thread always trying to remove the first thread just removed (understandably it is how we use the synchronization to prevent concurrent modification on the sorted set).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] sabbey37 commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
sabbey37 commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r643403494



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
##########
@@ -43,4 +43,10 @@ public long zadd(RedisKey key, List<byte[]> scoresAndMembersToAdd, ZAddOptions o
   public byte[] zscore(RedisKey key, byte[] member) {
     return stripedExecute(key, () -> getRedisSortedSet(key, true).zscore(member));
   }
+
+  @Override
+  public long zrem(RedisKey key, List<byte[]> membersToRemove) {
+    return stripedExecute(key,
+        () -> getRedisSortedSet(key, false).zrem(getRegion(), key, membersToRemove));

Review comment:
       Thank you for adding those tests!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] sabbey37 commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
sabbey37 commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r640667051



##########
File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";

Review comment:
       This variable could be final.

##########
File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void zRemCanRemoveMembersFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    long removed = jedis.zrem(sortedSetKey, memberScoreMap.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(setSize);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(memberScoreMap, totalRemoved),
+        (i) -> doZRem1(totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(setSize);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(sortedSetKey, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(AtomicInteger total) {
+    for (int i = 0; i < setSize; i++) {
+      long count = jedis.zrem(sortedSetKey, baseName + i);
+      total.addAndGet((int) count);
+    }
+  }
+
+  @Test
+  public void zRemRemovesMembersFromSortedSetAfterPrimaryShutsDown() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    stopNodeWithPrimaryBucketOfTheKey(false);
+
+    doZRemWithRetries(memberScoreMap);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRemWithRetries(Map<String, Double> map) {
+    int maxRetryAttempts = 10;
+    int retryAttempts = 0;
+    while (!zRemWithRetries(map, retryAttempts, maxRetryAttempts)) {
+      retryAttempts++;
+    }
+  }
+
+  private boolean zRemWithRetries(Map<String, Double> map, int retries, int maxRetries) {
+    long removed;
+    try {
+      removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] {}));
+    } catch (JedisClusterMaxAttemptsException e) {
+      if (retries < maxRetries) {
+        return false;
+      }
+      throw e;
+    }
+    assertThat(removed).isEqualTo(map.size());
+    return true;
+  }
+
+  private void doZRem(Map<String, Double> map) {
+    long removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(map.size());
+  }
+
+  @Test
+  @Ignore("Fails due to GEODE-9310")
+  public void zRemCanRemoveMembersFromSortedSetDuringPrimaryIsCrashed() throws Exception {
+    int mapSize = 300;
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(mapSize);
+
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    int number = 10;
+    String memberNotRemoved = baseName + number;
+    memberScoreMap.remove(memberNotRemoved);
+
+    Future<Void> future1 = executor.submit(() -> doZRem(memberScoreMap));
+    Future<Void> future2 = executor.submit(() -> stopNodeWithPrimaryBucketOfTheKey(true));
+
+    future1.get();
+    future2.get();
+
+    GeodeAwaitility.await().until(() -> verifyDataNotExist(memberScoreMap));
+    assertThat(jedis.exists(sortedSetKey)).isTrue();
+  }
+
+  private void verifyDataExist(Map<String, Double> memberScoreMap) {
+    for (String member : memberScoreMap.keySet()) {
+      Double score = jedis.zscore(sortedSetKey, member);
+      assertThat(score).isEqualTo(memberScoreMap.get(member));
+    }
+  }
+
+  private boolean verifyDataNotExist(Map<String, Double> memberScoreMap) {

Review comment:
       Could this be `verifyDataDoesNotExist`?

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java
##########
@@ -232,6 +237,45 @@ public void should_calculateSize_closeToROSSize_ofIndividualInstanceWithMultiple
     assertThat(actual).isCloseTo(expected, offset);
   }
 
+  private final String member1 = "member1";
+  private final String member2 = "member2";
+  private final String score1 = "5.55555";
+  private final String score2 = "209030.31";
+
+  @Test
+  public void zremCanRemoveMembersToBeRemoved() {
+    String member3 = "member3";
+    String score3 = "998955255.66361191";
+    RedisSortedSet sortedSet =
+        spy(createRedisSortedSet(score1, member1, score2, member2, score3, member3));
+    Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    RedisKey key = new RedisKey();
+    ArrayList<byte[]> membersToRemove = new ArrayList<>();
+    membersToRemove.add(Coder.stringToBytes("nonExisting"));
+    membersToRemove.add(Coder.stringToBytes(member1));
+    membersToRemove.add(Coder.stringToBytes(member3));
+
+    long removed = sortedSet.zrem(region, key, membersToRemove);
+
+    assertThat(removed).isEqualTo(2);
+    verify(sortedSet).storeChanges(eq(region), eq(key), any(RemsDeltaInfo.class));
+  }

Review comment:
       This is a cool test!  I'm not sure it's necessary though since all of this should've been validated via the DUnit tests, but it is kind of cool verifying all of this at a lower level. 

##########
File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void zRemCanRemoveMembersFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    long removed = jedis.zrem(sortedSetKey, memberScoreMap.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(setSize);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyFromSortedSet() {

Review comment:
       Could this be `zRemCanRemove...`?

##########
File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void zRemCanRemoveMembersFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    long removed = jedis.zrem(sortedSetKey, memberScoreMap.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(setSize);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(memberScoreMap, totalRemoved),
+        (i) -> doZRem1(totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(setSize);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(sortedSetKey, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(AtomicInteger total) {
+    for (int i = 0; i < setSize; i++) {
+      long count = jedis.zrem(sortedSetKey, baseName + i);
+      total.addAndGet((int) count);
+    }
+  }
+
+  @Test
+  public void zRemRemovesMembersFromSortedSetAfterPrimaryShutsDown() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    stopNodeWithPrimaryBucketOfTheKey(false);
+
+    doZRemWithRetries(memberScoreMap);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRemWithRetries(Map<String, Double> map) {
+    int maxRetryAttempts = 10;
+    int retryAttempts = 0;
+    while (!zRemWithRetries(map, retryAttempts, maxRetryAttempts)) {
+      retryAttempts++;
+    }
+  }
+
+  private boolean zRemWithRetries(Map<String, Double> map, int retries, int maxRetries) {
+    long removed;
+    try {
+      removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] {}));
+    } catch (JedisClusterMaxAttemptsException e) {
+      if (retries < maxRetries) {
+        return false;
+      }
+      throw e;
+    }
+    assertThat(removed).isEqualTo(map.size());
+    return true;
+  }
+
+  private void doZRem(Map<String, Double> map) {
+    long removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(map.size());
+  }
+
+  @Test
+  @Ignore("Fails due to GEODE-9310")
+  public void zRemCanRemoveMembersFromSortedSetDuringPrimaryIsCrashed() throws Exception {
+    int mapSize = 300;
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(mapSize);
+
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    int number = 10;
+    String memberNotRemoved = baseName + number;
+    memberScoreMap.remove(memberNotRemoved);
+
+    Future<Void> future1 = executor.submit(() -> doZRem(memberScoreMap));
+    Future<Void> future2 = executor.submit(() -> stopNodeWithPrimaryBucketOfTheKey(true));
+
+    future1.get();
+    future2.get();
+
+    GeodeAwaitility.await().until(() -> verifyDataNotExist(memberScoreMap));
+    assertThat(jedis.exists(sortedSetKey)).isTrue();
+  }
+
+  private void verifyDataExist(Map<String, Double> memberScoreMap) {

Review comment:
       Could we call this `verifyDataExists`?

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();

Review comment:
       Since `ConcurrentLoopingThreads` loops for us, maybe we could loop through the full `membersCount` size and do the zrem individually:
   ```
    new ConcurrentLoopingThreads(membersCount,
           (i) -> doZRem(i, totalRemoved),
           (i) -> doZRem(i, totalRemoved)).run();
   ```
   
   ```
    private void doZRem(int i, AtomicInteger total) {
         long count = jedis.zrem(SORTED_SET_KEY, baseName + i);
         total.addAndGet((int) count);
     }
     ```

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {

Review comment:
       could this be `zRemRemovesKeyIfAllMembersInASortedSetAreRemoved`?

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }

Review comment:
       This test is unnecessary because it is already verified in the `zRemThrowsIfTooFewArguments` test above it.

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);

Review comment:
       You can actually just `zadd` all the keys/values in the map directly:
   ```
   jedis.zadd(SORTED_SET_KEY, map);
   ```

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
##########
@@ -43,4 +43,10 @@ public long zadd(RedisKey key, List<byte[]> scoresAndMembersToAdd, ZAddOptions o
   public byte[] zscore(RedisKey key, byte[] member) {
     return stripedExecute(key, () -> getRedisSortedSet(key, true).zscore(member));
   }
+
+  @Override
+  public long zrem(RedisKey key, List<byte[]> membersToRemove) {
+    return stripedExecute(key,
+        () -> getRedisSortedSet(key, false).zrem(getRegion(), key, membersToRemove));

Review comment:
       I didn't think about this for ZADD/ZSCORE, but we should also add tests to `AbstractHitsMissesIntegrationTest` verifying that stats should/shouldn't be updated for `ZREM` and all new commands.

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java
##########
@@ -232,6 +237,45 @@ public void should_calculateSize_closeToROSSize_ofIndividualInstanceWithMultiple
     assertThat(actual).isCloseTo(expected, offset);
   }
 
+  private final String member1 = "member1";
+  private final String member2 = "member2";
+  private final String score1 = "5.55555";
+  private final String score2 = "209030.31";
+
+  @Test
+  public void zremCanRemoveMembersToBeRemoved() {
+    String member3 = "member3";
+    String score3 = "998955255.66361191";
+    RedisSortedSet sortedSet =
+        spy(createRedisSortedSet(score1, member1, score2, member2, score3, member3));
+    Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    RedisKey key = new RedisKey();
+    ArrayList<byte[]> membersToRemove = new ArrayList<>();
+    membersToRemove.add(Coder.stringToBytes("nonExisting"));
+    membersToRemove.add(Coder.stringToBytes(member1));
+    membersToRemove.add(Coder.stringToBytes(member3));
+
+    long removed = sortedSet.zrem(region, key, membersToRemove);
+
+    assertThat(removed).isEqualTo(2);
+    verify(sortedSet).storeChanges(eq(region), eq(key), any(RemsDeltaInfo.class));
+  }
+
+  @Test
+  public void memberRemoveCanRemoveMemberInSortedSet() {
+    RedisSortedSet sortedSet = createRedisSortedSet(score1, member1, score2, member2);
+    RedisSortedSet sortedSet2 = createRedisSortedSet(score2, member2);
+    int originalSize = sortedSet.getSizeInBytes();
+
+    byte[] returnValue = sortedSet.memberRemove(Coder.stringToBytes(member1));
+    int removedSize =
+        sortedSet.calculateSizeOfFieldValuePair(Coder.stringToBytes(member1), returnValue);
+
+    assertThat(sortedSet).isEqualTo(sortedSet2);
+    assertThat(returnValue).isEqualTo(Coder.stringToBytes(score1));
+    assertThat(sortedSet.getSizeInBytes()).isEqualTo(originalSize - removedSize);
+  }

Review comment:
       For checking that the sizeInBytes is accurate, I'd prefer we use the `ReflectionObjectSizer.sizeOf` directly in the tests.  I think @nonbinaryprogrammer has been working on a story to fix the sizeable calculations though.  The other assertions in this test seem unnecessary since they were already validated by integration tests.  

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(membersCount);
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(SORTED_SET_KEY, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(int memberCounts, AtomicInteger total) {
+    for (int i = 0; i < memberCounts; i++) {
+      long count = jedis.zrem(SORTED_SET_KEY, baseName + i);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void addToSortedSet(Map<String, Double> map) {
+    Set<String> keys = map.keySet();
+    Long count = 0L;
+
+    for (String member : keys) {
+      Double score = map.get(member);
+      Long res = jedis.zadd(SORTED_SET_KEY, score, member);
+      count += res;
+    }
+    assertThat(count).isEqualTo(keys.size());
+  }

Review comment:
       This method seems unnecessary since we can `zadd` all the keys/values in the map directly.  

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {

Review comment:
       Could this be `zRemCanRemove....`?

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java
##########
@@ -116,7 +121,7 @@ public void equals_returnsTrue_givenDifferentEmptySortedSets() {
 
   @Test
   public void zadd_stores_delta_that_is_stable() throws IOException {
-    Region<RedisKey, RedisData> region = uncheckedCast(Mockito.mock(Region.class));
+    Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));

Review comment:
       Thanks for making this better!

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/sortedset/ZRemExecutor.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ZRemExecutor extends AbstractExecutor {
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+    RedisSortedSetCommands redisSortedSetCommands = context.getRedisSortedSetCommands();;
+
+    List<byte[]> commandElements = command.getProcessedCommand();
+    RedisKey key = command.getKey();
+    ArrayList<byte[]> membersToRemove =
+        new ArrayList<>(commandElements.subList(2, commandElements.size()));

Review comment:
       We could use `List<byte[]>` for the variable type here.

##########
File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void zRemCanRemoveMembersFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    long removed = jedis.zrem(sortedSetKey, memberScoreMap.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(setSize);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(memberScoreMap, totalRemoved),
+        (i) -> doZRem1(totalRemoved)).run();

Review comment:
       Since ConcurrentLoopingThreads loops for us, maybe we could loop through the full `setSize` and do the zrem individually:
   ```
    new ConcurrentLoopingThreads(setSize,
           (i) -> doZRem(i, totalRemoved),
           (i) -> doZRem(i, totalRemoved)).run();
    ```
    ```
    private void doZRem(int i, AtomicInteger totalRemoved) {
         long count = jedis.zrem(SORTED_SET_KEY, baseName + i);
         totalRemoved.addAndGet((int) count);
     }
     ```

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -87,7 +87,9 @@ protected void applyDelta(DeltaInfo deltaInfo) {
       membersAddAll(addsDeltaInfo);
     } else {
       RemsDeltaInfo remsDeltaInfo = (RemsDeltaInfo) deltaInfo;
-      membersRemoveAll(remsDeltaInfo);
+      for (byte[] member : remsDeltaInfo.getRemoves()) {
+        memberRemove(member);
+      }

Review comment:
       Could we also update the way we're applying delta for adds so that it's consistent with how we've changed the removes, so eliminating the `memberAddAll` method and moving that logic into this method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-eshu commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
pivotal-eshu commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r641076184



##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSortedSetTest.java
##########
@@ -232,6 +237,45 @@ public void should_calculateSize_closeToROSSize_ofIndividualInstanceWithMultiple
     assertThat(actual).isCloseTo(expected, offset);
   }
 
+  private final String member1 = "member1";
+  private final String member2 = "member2";
+  private final String score1 = "5.55555";
+  private final String score2 = "209030.31";
+
+  @Test
+  public void zremCanRemoveMembersToBeRemoved() {
+    String member3 = "member3";
+    String score3 = "998955255.66361191";
+    RedisSortedSet sortedSet =
+        spy(createRedisSortedSet(score1, member1, score2, member2, score3, member3));
+    Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    RedisKey key = new RedisKey();
+    ArrayList<byte[]> membersToRemove = new ArrayList<>();
+    membersToRemove.add(Coder.stringToBytes("nonExisting"));
+    membersToRemove.add(Coder.stringToBytes(member1));
+    membersToRemove.add(Coder.stringToBytes(member3));
+
+    long removed = sortedSet.zrem(region, key, membersToRemove);
+
+    assertThat(removed).isEqualTo(2);
+    verify(sortedSet).storeChanges(eq(region), eq(key), any(RemsDeltaInfo.class));
+  }
+
+  @Test
+  public void memberRemoveCanRemoveMemberInSortedSet() {
+    RedisSortedSet sortedSet = createRedisSortedSet(score1, member1, score2, member2);
+    RedisSortedSet sortedSet2 = createRedisSortedSet(score2, member2);
+    int originalSize = sortedSet.getSizeInBytes();
+
+    byte[] returnValue = sortedSet.memberRemove(Coder.stringToBytes(member1));
+    int removedSize =
+        sortedSet.calculateSizeOfFieldValuePair(Coder.stringToBytes(member1), returnValue);
+
+    assertThat(sortedSet).isEqualTo(sortedSet2);
+    assertThat(returnValue).isEqualTo(Coder.stringToBytes(score1));
+    assertThat(sortedSet.getSizeInBytes()).isEqualTo(originalSize - removedSize);
+  }

Review comment:
       This is just to test in the unit level to verify -- we want to have coverage at lower level if possible.
   
   I added a test case using ReflectionObjectSizer.sizeOf but need to be changed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-eshu commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
pivotal-eshu commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r641053183



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSetCommandsFunctionExecutor.java
##########
@@ -43,4 +43,10 @@ public long zadd(RedisKey key, List<byte[]> scoresAndMembersToAdd, ZAddOptions o
   public byte[] zscore(RedisKey key, byte[] member) {
     return stripedExecute(key, () -> getRedisSortedSet(key, true).zscore(member));
   }
+
+  @Override
+  public long zrem(RedisKey key, List<byte[]> membersToRemove) {
+    return stripedExecute(key,
+        () -> getRedisSortedSet(key, false).zrem(getRegion(), key, membersToRemove));

Review comment:
       Added the test coverage.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r643146555



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();

Review comment:
       Ideally, the function you pass into `ConcurrentLoopingThreads` shouldn't be doing its own loop. As it is currently set up, the CLT will only loop twice and the second time everything is already gone. Perhaps one slight adjustment to @sabbey37's suggestion is to replace `run()` with `runInLockstep()`. This means that for every iteration of the CLT loop, all threads will wait for completion of a given iteration before moving to the next one. The idea is that it increases the possibility of concurrency errors if they exist. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] sabbey37 commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
sabbey37 commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r640957582



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();

Review comment:
       I think that's kind of the point though... that we have two threads trying to remove something from the same set at the same time. We want to make sure we don't hit and concurrent modification exceptions or report we've removed the same thing twice. We also can't be sure which thread will run/complete first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-eshu commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
pivotal-eshu commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r643372692



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();

Review comment:
       Removed the own loop in the methods.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] sabbey37 commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
sabbey37 commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r640957582



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();

Review comment:
       I think that's kind of the point though... that we have two threads trying to remove something from the same set at the same time. We want to make sure we don't hit any concurrent modification exceptions or report we've removed the same thing twice. We also can't be sure which thread will run/complete first.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] sabbey37 commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
sabbey37 commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r640711077



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(membersCount);
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(SORTED_SET_KEY, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(int memberCounts, AtomicInteger total) {
+    for (int i = 0; i < memberCounts; i++) {
+      long count = jedis.zrem(SORTED_SET_KEY, baseName + i);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void addToSortedSet(Map<String, Double> map) {
+    Set<String> keys = map.keySet();
+    Long count = 0L;
+
+    for (String member : keys) {
+      Double score = map.get(member);
+      Long res = jedis.zadd(SORTED_SET_KEY, score, member);
+      count += res;
+    }
+    assertThat(count).isEqualTo(keys.size());
+  }

Review comment:
       This method seems unnecessary since we can `zadd` all the keys/values in the map directly at the same time.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-eshu commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
pivotal-eshu commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r641052564



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -87,7 +87,9 @@ protected void applyDelta(DeltaInfo deltaInfo) {
       membersAddAll(addsDeltaInfo);
     } else {
       RemsDeltaInfo remsDeltaInfo = (RemsDeltaInfo) deltaInfo;
-      membersRemoveAll(remsDeltaInfo);
+      for (byte[] member : remsDeltaInfo.getRemoves()) {
+        memberRemove(member);
+      }

Review comment:
       Actually I decide to keep the removeAll method as it uses synchronized keyword - indicating potentially multiple threads can work on the data set. Use removeAll and addAll, no additional thread will be able to modify members concurrently -all or none.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] pivotal-eshu commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
pivotal-eshu commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r641050306



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();

Review comment:
       At least that is what I see from running the test with above mentioned methods. The second one is always blocked on the synchronization code in Redis -- when that thread is unblocked and working on the key. The first thread is blocked on the key. At least this is what I found. I think we at least want to test that both threads can remove some of the members in the sorted set.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r640959165



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();

Review comment:
       The order that the threads execute the commands in `ConcurrentLoopingThreads` is not deterministic. So either one could be 'first' to remove a given entry.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] sabbey37 commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
sabbey37 commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r640710185



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);

Review comment:
       You can actually just `zadd` all the keys/values in the map directly at the same time:
   ```
   jedis.zadd(SORTED_SET_KEY, map);
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] DonalEvans commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
DonalEvans commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r640912873



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(membersCount);
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(SORTED_SET_KEY, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(int memberCounts, AtomicInteger total) {
+    for (int i = 0; i < memberCounts; i++) {
+      long count = jedis.zrem(SORTED_SET_KEY, baseName + i);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void addToSortedSet(Map<String, Double> map) {
+    Set<String> keys = map.keySet();
+    Long count = 0L;
+
+    for (String member : keys) {
+      Double score = map.get(member);
+      Long res = jedis.zadd(SORTED_SET_KEY, score, member);
+      count += res;
+    }
+    assertThat(count).isEqualTo(keys.size());
+  }
+
+  private Map<String, Double> makeMemberScoreMap(int membersCount) {
+    int baseScore = 0;

Review comment:
       This variable is redundant and can probably be removed.

##########
File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();

Review comment:
       It should not be necessary to call `stop()` on members, as this is handled by the `RedisClusterStartupRule`.

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String SORTED_SET_KEY = "ss_key";
+  private static final int INITIAL_MEMBER_COUNT = 5;
+
+  @Before
+  public void setUp() {
+    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    flushAll();
+    jedis.close();
+  }
+
+  @Test
+  public void zRemThrowsIfTooFewArguments() {
+    assertAtLeastNArgs(jedis, Protocol.Command.ZREM, 2);
+  }
+
+  @Test
+  public void zRemThrowsIfGivenOnlyKey() {
+    assertThatThrownBy(() -> jedis.zrem("key")).hasMessageContaining("wrong number of arguments");
+  }
+
+  @Test
+  public void zRemThrowsErrorIfKeyIsNotASortedSet() {
+    String key = "key";
+    String member = "member1";
+    jedis.sadd(key, member);
+
+    assertThatThrownBy(() -> jedis.zrem(key, member))
+        .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
+
+  @Test
+  public void zRemDoesNotRemoveNonExistingMember() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    addToSortedSet(map);
+
+    String nonExistingMember = "nonExisting";
+    long result = jedis.zrem(SORTED_SET_KEY, nonExistingMember);
+
+    assertThat(result).isEqualTo(0);
+  }
+
+  @Test
+  public void zRemCanRemoveAMemberInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String memberToRemove = baseName + 1;
+    Long removed = jedis.zrem(SORTED_SET_KEY, memberToRemove);
+    assertThat(removed).isEqualTo(1);
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      if (member.equals(memberToRemove)) {
+        assertThat(score).isNull();
+      } else {
+        assertThat(score).isNotNull();
+      }
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isTrue();
+  }
+
+  @Test
+  public void zRemRemovesKeyIfRemoveAllMembersInASortedSet() {
+    Map<String, Double> map = makeMemberScoreMap(INITIAL_MEMBER_COUNT);
+    Set<String> keys = map.keySet();
+    addToSortedSet(map);
+
+    String[] membersToRemove = new String[keys.size()];
+    Long removed = jedis.zrem(SORTED_SET_KEY, keys.toArray(membersToRemove));
+    assertThat(removed).isEqualTo(keys.size());
+
+    for (String member : keys) {
+      Double score = jedis.zscore(SORTED_SET_KEY, member);
+      assertThat(score).isNull();
+    }
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyInASortedSet() {
+    int membersCount = 1000;
+    Map<String, Double> map = makeMemberScoreMap(membersCount);
+    addToSortedSet(map);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(map, totalRemoved),
+        (i) -> doZRem1(membersCount, totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(membersCount);
+    assertThat(jedis.exists(SORTED_SET_KEY)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(SORTED_SET_KEY, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(int memberCounts, AtomicInteger total) {

Review comment:
       Could these methods be named a little more descriptively? Perhaps something like "doZRemOnAllKeysInMap" and "doMultipleZRem"? Also, the argument `memberCounts` could be better named as something like "numToRemove".

##########
File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void zRemCanRemoveMembersFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    long removed = jedis.zrem(sortedSetKey, memberScoreMap.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(setSize);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(memberScoreMap, totalRemoved),
+        (i) -> doZRem1(totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(setSize);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(sortedSetKey, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(AtomicInteger total) {

Review comment:
       Could these methods be renamed to something a little more descriptive? Maybe "doZRemOnAllKeysInMap" and "doZRemOnAllMembers"?

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/sortedset/AbstractZRemIntegrationTest.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.Protocol;
+
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+
+public abstract class AbstractZRemIntegrationTest implements RedisIntegrationTest {
+  private JedisCluster jedis;
+  private final String baseName = "member_";
+
+  private static final int REDIS_CLIENT_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());

Review comment:
       This constant is defined in `RedisIntegrationTest` with the same value, so doesn't need to be redefined here. It seems like this is present in a lot of classes that implement `RedisIntegrationTest`, so maybe a ticket could be filed to clean up all those uses.

##########
File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/sortedset/ZRemDUnitTest.java
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.executor.sortedset;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.DEFAULT_MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+import redis.clients.jedis.exceptions.JedisClusterMaxAttemptsException;
+
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.PartitionedRegionHelper;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.redis.internal.netty.Coder;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class ZRemDUnitTest implements Serializable {
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+  private transient JedisCluster jedis;
+  private MemberVM locator;
+  private MemberVM server1;
+  private MemberVM server2;
+  private MemberVM server3;
+  private String sortedSetKey = "key";
+  private final String baseName = "member1-";
+  private final int setSize = 1000;
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  @Before
+  public void setup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, DEFAULT_MAX_WAIT_TIME_RECONNECT);
+
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    server1 = clusterStartUp.startRedisVM(1, locator.getPort());
+    server2 = clusterStartUp.startRedisVM(2, locator.getPort());
+    server3 = clusterStartUp.startRedisVM(3, locator.getPort());
+
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+
+    jedis = new JedisCluster(new HostAndPort(LOCAL_HOST, redisServerPort), JEDIS_TIMEOUT);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+
+    locator.stop();
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void zRemCanRemoveMembersFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    long removed = jedis.zrem(sortedSetKey, memberScoreMap.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(setSize);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  @Test
+  public void zRemCanRemovesMembersConcurrentlyFromSortedSet() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    AtomicInteger totalRemoved = new AtomicInteger();
+    new ConcurrentLoopingThreads(2,
+        (i) -> doZRem(memberScoreMap, totalRemoved),
+        (i) -> doZRem1(totalRemoved)).run();
+
+    assertThat(totalRemoved.get()).isEqualTo(setSize);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRem(Map<String, Double> map, AtomicInteger total) {
+    Set<String> keys = map.keySet();
+    for (String key : keys) {
+      long count = jedis.zrem(sortedSetKey, key);
+      total.addAndGet((int) count);
+    }
+  }
+
+  private void doZRem1(AtomicInteger total) {
+    for (int i = 0; i < setSize; i++) {
+      long count = jedis.zrem(sortedSetKey, baseName + i);
+      total.addAndGet((int) count);
+    }
+  }
+
+  @Test
+  public void zRemRemovesMembersFromSortedSetAfterPrimaryShutsDown() {
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(setSize);
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    stopNodeWithPrimaryBucketOfTheKey(false);
+
+    doZRemWithRetries(memberScoreMap);
+
+    verifyDataNotExist(memberScoreMap);
+    assertThat(jedis.exists(sortedSetKey)).isFalse();
+  }
+
+  private void doZRemWithRetries(Map<String, Double> map) {
+    int maxRetryAttempts = 10;
+    int retryAttempts = 0;
+    while (!zRemWithRetries(map, retryAttempts, maxRetryAttempts)) {
+      retryAttempts++;
+    }
+  }
+
+  private boolean zRemWithRetries(Map<String, Double> map, int retries, int maxRetries) {
+    long removed;
+    try {
+      removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] {}));
+    } catch (JedisClusterMaxAttemptsException e) {
+      if (retries < maxRetries) {
+        return false;
+      }
+      throw e;
+    }
+    assertThat(removed).isEqualTo(map.size());
+    return true;
+  }
+
+  private void doZRem(Map<String, Double> map) {
+    long removed = jedis.zrem(sortedSetKey, map.keySet().toArray(new String[] {}));
+    assertThat(removed).isEqualTo(map.size());
+  }
+
+  @Test
+  @Ignore("Fails due to GEODE-9310")
+  public void zRemCanRemoveMembersFromSortedSetDuringPrimaryIsCrashed() throws Exception {
+    int mapSize = 300;
+    Map<String, Double> memberScoreMap = makeMemberScoreMap(mapSize);
+
+    jedis.zadd(sortedSetKey, memberScoreMap);
+    verifyDataExist(memberScoreMap);
+
+    int number = 10;
+    String memberNotRemoved = baseName + number;
+    memberScoreMap.remove(memberNotRemoved);
+
+    Future<Void> future1 = executor.submit(() -> doZRem(memberScoreMap));
+    Future<Void> future2 = executor.submit(() -> stopNodeWithPrimaryBucketOfTheKey(true));
+
+    future1.get();
+    future2.get();
+
+    GeodeAwaitility.await().until(() -> verifyDataNotExist(memberScoreMap));
+    assertThat(jedis.exists(sortedSetKey)).isTrue();
+  }
+
+  private void verifyDataExist(Map<String, Double> memberScoreMap) {
+    for (String member : memberScoreMap.keySet()) {
+      Double score = jedis.zscore(sortedSetKey, member);
+      assertThat(score).isEqualTo(memberScoreMap.get(member));
+    }
+  }
+
+  private boolean verifyDataNotExist(Map<String, Double> memberScoreMap) {
+    try {
+      for (String member : memberScoreMap.keySet()) {
+        Double score = jedis.zscore(sortedSetKey, member);
+        assertThat(score).isNull();
+      }
+    } catch (JedisClusterMaxAttemptsException e) {
+      return false;
+    }
+    return true;
+  }
+
+  private void stopNodeWithPrimaryBucketOfTheKey(boolean isCrash) {
+    int numOfServers = 4;

Review comment:
       Rather than hard-coding this value, it might be better to add the servers to a `List<MemberVM>` in the `setUp()` method and iterate through that here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] sabbey37 commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
sabbey37 commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r640723961



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -87,7 +87,9 @@ protected void applyDelta(DeltaInfo deltaInfo) {
       membersAddAll(addsDeltaInfo);
     } else {
       RemsDeltaInfo remsDeltaInfo = (RemsDeltaInfo) deltaInfo;
-      membersRemoveAll(remsDeltaInfo);
+      for (byte[] member : remsDeltaInfo.getRemoves()) {
+        memberRemove(member);
+      }

Review comment:
       Could we also update the way we're applying delta for adds so that it's consistent with how we've changed the removes, so eliminating the `membersAddAll` method and moving that logic into this method?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] sabbey37 commented on a change in pull request #6524: GEODE-9318: Implement ZREM command.

Posted by GitBox <gi...@apache.org>.
sabbey37 commented on a change in pull request #6524:
URL: https://github.com/apache/geode/pull/6524#discussion_r643118724



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSortedSet.java
##########
@@ -87,7 +87,9 @@ protected void applyDelta(DeltaInfo deltaInfo) {
       membersAddAll(addsDeltaInfo);
     } else {
       RemsDeltaInfo remsDeltaInfo = (RemsDeltaInfo) deltaInfo;
-      membersRemoveAll(remsDeltaInfo);
+      for (byte[] member : remsDeltaInfo.getRemoves()) {
+        memberRemove(member);
+      }

Review comment:
       Good call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org