You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ri...@apache.org on 2022/04/06 15:10:23 UTC
[geode] branch develop updated: GEODE-10121: Fix transactional redis commands (#7513)
This is an automated email from the ASF dual-hosted git repository.
ringles pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 141d43dc67 GEODE-10121: Fix transactional redis commands (#7513)
141d43dc67 is described below
commit 141d43dc67452af4940c24203719e6193be42300
Author: Bala Kaza Venkata <43...@users.noreply.github.com>
AuthorDate: Wed Apr 6 11:10:17 2022 -0400
GEODE-10121: Fix transactional redis commands (#7513)
* GEODE-10121: Fix transactional redis commands
MSET, SMOVE commands are transactional but they have not been working as
transactional. This commit will fix these commands to behave that way.
Authored-by: Bala Kaza Venkata <bk...@vmware.com>
---
.../commands/executor/list/RPopLPushDUnitTest.java | 121 +++++----
.../commands/executor/set/SMoveDunitTest.java | 276 +++++++++++++++++++++
.../commands/executor/string/MSetDUnitTest.java | 93 +++++++
.../list/AbstractRPopLPushIntegrationTest.java | 6 +
.../executor/set/AbstractSMoveIntegrationTest.java | 20 ++
.../commands/executor/string/SetExecutor.java | 3 +-
.../redis/internal/data/AbstractRedisData.java | 3 +-
.../geode/redis/internal/data/NullRedisData.java | 5 +
.../geode/redis/internal/data/RedisData.java | 2 +
.../geode/redis/internal/data/RedisList.java | 25 +-
.../apache/geode/redis/internal/data/RedisSet.java | 29 ++-
11 files changed, 525 insertions(+), 58 deletions(-)
diff --git a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/RPopLPushDUnitTest.java b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/RPopLPushDUnitTest.java
index e038d2c226..fb338df31d 100644
--- a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/RPopLPushDUnitTest.java
+++ b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/RPopLPushDUnitTest.java
@@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -32,19 +33,18 @@ import java.util.stream.IntStream;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
-import org.apache.geode.cache.Region;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.RegionEvent;
import org.apache.geode.redis.internal.data.RedisData;
import org.apache.geode.redis.internal.data.RedisKey;
-import org.apache.geode.redis.internal.data.RedisList;
-import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
@@ -52,7 +52,7 @@ import org.apache.geode.test.junit.rules.ExecutorServiceRule;
public class RPopLPushDUnitTest {
public static final String KEY_1 = "key1";
public static final String KEY_2 = "key2";
- public static final String THROWING_REDIS_LIST_EXCEPTION = "to be ignored";
+ public static final String THROWING_CACHE_WRITER_EXCEPTION = "to be ignored";
@Rule
public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule();
@@ -142,51 +142,87 @@ public class RPopLPushDUnitTest {
future3.get();
}
- @Ignore("GEODE-10121")
@Test
public void rpoplpush_isTransactional() {
- String hashTag = "{" + clusterStartUp.getKeyOnServer("tag", 1) + "}";
+ IgnoredException.addIgnoredException(THROWING_CACHE_WRITER_EXCEPTION);
- // Create two real RedisList entries
- String sourceKey = hashTag + KEY_1;
- String[] sourceElements = {"sourceElement1", "sourceElement2"};
- jedis.lpush(sourceKey, sourceElements);
- String destinationKey = hashTag + KEY_2;
- String destinationElement = "destinationElement";
- jedis.lpush(destinationKey, destinationElement);
+ int primaryVMIndex = 1;
+ final String tag = "{" + clusterStartUp.getKeyOnServer("tag", primaryVMIndex) + "}";
+
+ final String sourceKey = tag + KEY_1;
+ int sourceSize = 2;
+ List<String> initialElements = makeInitialElementsList(sourceSize);
+ jedis.lpush(sourceKey, initialElements.toArray(new String[0]));
- String throwingRedisListKey = hashTag + "ThrowingRedisList";
- String throwingListElement = "shouldNotMove";
+ String throwingKey = tag + "ThrowingRedisString";
+ int destinationSize = 2;
+ List<String> elementsForThrowingKey = makeInitialElementsList(sourceSize);
+ jedis.lpush(throwingKey, elementsForThrowingKey.toArray(new String[0]));
- // Put a test version of RedisList directly into the region that throws if rpop() or lpush() are
- // called on it
+ // Install a cache writer that will throw an exception if a key with a name equal to throwingKey
+ // is updated or created
clusterStartUp.getMember(1).invoke(() -> {
- RedisKey throwingKey = new RedisKey(throwingRedisListKey.getBytes(StandardCharsets.UTF_8));
- ThrowingRedisList throwingRedisList = new ThrowingRedisList();
- throwingRedisList.elementInsert(throwingListElement.getBytes(StandardCharsets.UTF_8), 0);
- ClusterStartupRule.getCache().getRegion(DEFAULT_REDIS_REGION_NAME).put(throwingKey,
- throwingRedisList);
+ RedisClusterStartupRule.getCache()
+ .<RedisKey, RedisData>getRegion(DEFAULT_REDIS_REGION_NAME)
+ .getAttributesMutator()
+ .setCacheWriter(new RPopLPushDUnitTest.ThrowingCacheWriter(throwingKey));
});
- IgnoredException.addIgnoredException(THROWING_REDIS_LIST_EXCEPTION);
+ assertThatThrownBy(
+ () -> jedis.rpoplpush(sourceKey, throwingKey))
+ .hasMessage(SERVER_ERROR_MESSAGE);
- // Test with an exception being thrown from the source RedisList
- assertThatThrownBy(() -> jedis.rpoplpush(throwingRedisListKey, destinationKey))
- .hasMessage(SERVER_ERROR_MESSAGE);
-
- assertThat(jedis.lrange(throwingRedisListKey, 0, -1)).containsExactly(throwingListElement);
- assertThat(jedis.lrange(destinationKey, 0, -1)).containsExactly(destinationElement);
+ List<String> reversedInitialElements = new ArrayList<>(initialElements);
+ Collections.reverse(reversedInitialElements);
- // Test with an exception being thrown from the destination RedisList
- assertThatThrownBy(() -> jedis.rpoplpush(sourceKey, throwingRedisListKey))
- .hasMessage(SERVER_ERROR_MESSAGE);
+ List<String> reversedElementsForThrowingKey = new ArrayList<>(elementsForThrowingKey);
+ Collections.reverse(reversedElementsForThrowingKey);
- assertThat(jedis.lrange(sourceKey, 0, -1)).containsExactlyInAnyOrder(sourceElements);
- assertThat(jedis.lrange(throwingRedisListKey, 0, -1)).containsExactly(throwingRedisListKey);
+ // Assert rpoplpush has not happened
+ assertThat(jedis.lrange(sourceKey, 0, -1)).containsExactlyElementsOf(reversedInitialElements);
+ assertThat(jedis.lrange(throwingKey, 0, -1))
+ .containsExactlyElementsOf(reversedElementsForThrowingKey);
IgnoredException.removeAllExpectedExceptions();
}
+ private static class ThrowingCacheWriter implements CacheWriter<RedisKey, RedisData> {
+ private final byte[] keyBytes;
+
+ ThrowingCacheWriter(String key) {
+ keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public void beforeUpdate(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+ if (Arrays.equals(event.getKey().toBytes(), keyBytes)) {
+ throw new CacheWriterException(THROWING_CACHE_WRITER_EXCEPTION);
+ }
+ }
+
+ @Override
+ public void beforeCreate(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+ if (Arrays.equals(event.getKey().toBytes(), keyBytes)) {
+ throw new CacheWriterException(THROWING_CACHE_WRITER_EXCEPTION);
+ }
+ }
+
+ @Override
+ public void beforeDestroy(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+
+ }
+
+ @Override
+ public void beforeRegionDestroy(RegionEvent<RedisKey, RedisData> event)
+ throws CacheWriterException {
+
+ }
+
+ @Override
+ public void beforeRegionClear(RegionEvent<RedisKey, RedisData> event)
+ throws CacheWriterException {}
+ }
+
private List<String> getHashTagsForEachServer() {
List<String> hashTags = new ArrayList<>();
hashTags.add("{" + clusterStartUp.getKeyOnServer("tag", 1) + "}");
@@ -248,17 +284,4 @@ public class RPopLPushDUnitTest {
}
}
}
-
- static class ThrowingRedisList extends RedisList {
- @Override
- public long lpush(ExecutionHandlerContext context, List<byte[]> elementsToAdd, RedisKey key,
- boolean onlyIfExists) {
- throw new RuntimeException(THROWING_REDIS_LIST_EXCEPTION);
- }
-
- @Override
- public byte[] rpop(Region<RedisKey, RedisData> region, RedisKey key) {
- throw new RuntimeException(THROWING_REDIS_LIST_EXCEPTION);
- }
- }
}
diff --git a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/set/SMoveDunitTest.java b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/set/SMoveDunitTest.java
new file mode 100644
index 0000000000..edc9e2d790
--- /dev/null
+++ b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/set/SMoveDunitTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.set;
+
+import static org.apache.geode.redis.internal.RedisConstants.SERVER_ERROR_MESSAGE;
+import static org.apache.geode.redis.internal.services.RegionProvider.DEFAULT_REDIS_REGION_NAME;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class SMoveDunitTest {
+
+ public static final String THROWING_CACHE_WRITER_EXCEPTION = "to be ignored";
+
+ @Rule
+ public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule();
+
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ public static final String KEY_1 = "key1";
+ public static final String KEY_2 = "key2";
+ private static JedisCluster jedis;
+
+ @Before
+ public void testSetup() {
+ MemberVM locator = clusterStartUp.startLocatorVM(0);
+ clusterStartUp.startRedisVM(1, locator.getPort());
+ clusterStartUp.startRedisVM(2, locator.getPort());
+ clusterStartUp.startRedisVM(3, locator.getPort());
+ int redisServerPort = clusterStartUp.getRedisPort(1);
+ jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort), 20_000);
+ clusterStartUp.flushAll();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ jedis.close();
+ }
+
+
+ @Test
+ public void shouldDistributeDataAmongCluster_andRetainDataAfterServerCrash() {
+ int primaryVMIndex = 1;
+ final String tag = "{" + clusterStartUp.getKeyOnServer("tag", primaryVMIndex) + "}";
+ final String sourceKey = tag + KEY_1;
+ final String destinationKey = tag + KEY_2;
+
+
+ final int elementsToMove = 5;
+ final int initialElementCount = elementsToMove * 2;
+
+ List<String> members = makeMemberList(initialElementCount, "member1-");
+
+ jedis.sadd(sourceKey, members.toArray(new String[] {}));
+
+ // Move half the elements from the source set to the destination
+ for (int i = 0; i < elementsToMove; ++i) {
+ assertThat(jedis.smove(sourceKey, destinationKey, members.get(i))).isEqualTo(1);
+ }
+
+ clusterStartUp.crashVM(primaryVMIndex); // kill primary server
+
+ assertThat(jedis.smembers(sourceKey))
+ .containsExactlyInAnyOrderElementsOf(members.subList(elementsToMove, initialElementCount));
+ assertThat(jedis.smembers(destinationKey))
+ .containsExactlyInAnyOrderElementsOf(members.subList(0, elementsToMove));
+ }
+
+ @Test
+ public void givenBucketsMovedDuringSMove_thenOperationsAreNotLostOrDuplicated()
+ throws InterruptedException, ExecutionException {
+
+ final AtomicBoolean continueRunning = new AtomicBoolean(true);
+ final List<String> hashTags = getHashTagsForEachServer();
+
+ List<String> members1 = makeMemberList(10, "member1-");
+ List<String> members2 = makeMemberList(10, "member2-");
+ List<String> members3 = makeMemberList(10, "member3-");
+
+ jedis.sadd(hashTags.get(0) + KEY_1, members1.toArray(new String[] {}));
+ jedis.sadd(hashTags.get(1) + KEY_1, members2.toArray(new String[] {}));
+ jedis.sadd(hashTags.get(2) + KEY_1, members3.toArray(new String[] {}));
+
+ Future<Void> future1 = executor.runAsync(() -> repeatSMove(hashTags.get(0),
+ members1, continueRunning));
+ Future<Void> future2 = executor.runAsync(() -> repeatSMove(hashTags.get(1),
+ members2, continueRunning));
+ Future<Void> future3 =
+ executor.runAsync(() -> repeatSMoveWithSameSourceAndDest(hashTags.get(2),
+ members3, continueRunning));
+
+ for (int i = 0; i < 25 && continueRunning.get(); i++) {
+ clusterStartUp.moveBucketForKey(hashTags.get(i % hashTags.size()));
+ Thread.sleep(200);
+ }
+
+ continueRunning.set(false);
+
+ future1.get();
+ future2.get();
+ future3.get();
+ }
+
+ @Test
+ public void smove_isTransactional() {
+ IgnoredException.addIgnoredException(THROWING_CACHE_WRITER_EXCEPTION);
+
+ int primaryVMIndex = 1;
+ final String tag = "{" + clusterStartUp.getKeyOnServer("tag", primaryVMIndex) + "}";
+
+ final String sourceKey = tag + KEY_1;
+ int sourceSize = 2;
+ List<String> members = makeMemberList(sourceSize, "member1-");
+ jedis.sadd(sourceKey, members.toArray(new String[] {}));
+
+ String throwingKey = tag + "ThrowingRedisString";
+ int destinationSize = 2;
+ List<String> membersForThrowingKey = makeMemberList(2, "ThrowingRedisStringValue");
+ jedis.sadd(throwingKey, membersForThrowingKey.toArray(new String[] {}));
+
+ // Install a cache writer that will throw an exception if a key with a name equal to throwingKey
+ // is updated or created
+ clusterStartUp.getMember(1).invoke(() -> {
+ RedisClusterStartupRule.getCache()
+ .<RedisKey, RedisData>getRegion(DEFAULT_REDIS_REGION_NAME)
+ .getAttributesMutator()
+ .setCacheWriter(new SMoveDunitTest.ThrowingCacheWriter(throwingKey));
+ });
+
+ String memberToRemove = "member1-0";
+
+ assertThatThrownBy(
+ () -> jedis.smove(sourceKey, throwingKey, memberToRemove))
+ .hasMessage(SERVER_ERROR_MESSAGE);
+
+ // Assert smove has not happened
+ assertThat(jedis.smembers(sourceKey)).containsExactlyInAnyOrderElementsOf(members);
+ assertThat(jedis.smembers(throwingKey))
+ .containsExactlyInAnyOrderElementsOf(membersForThrowingKey);
+
+ IgnoredException.removeAllExpectedExceptions();
+ }
+
+ private static class ThrowingCacheWriter implements CacheWriter<RedisKey, RedisData> {
+ private final byte[] keyBytes;
+
+ ThrowingCacheWriter(String key) {
+ keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public void beforeUpdate(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+ if (Arrays.equals(event.getKey().toBytes(), keyBytes)) {
+ throw new CacheWriterException(THROWING_CACHE_WRITER_EXCEPTION);
+ }
+ }
+
+ @Override
+ public void beforeCreate(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+ if (Arrays.equals(event.getKey().toBytes(), keyBytes)) {
+ throw new CacheWriterException(THROWING_CACHE_WRITER_EXCEPTION);
+ }
+ }
+
+ @Override
+ public void beforeDestroy(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+
+ }
+
+ @Override
+ public void beforeRegionDestroy(RegionEvent<RedisKey, RedisData> event)
+ throws CacheWriterException {
+
+ }
+
+ @Override
+ public void beforeRegionClear(RegionEvent<RedisKey, RedisData> event)
+ throws CacheWriterException {}
+ }
+
+ private void repeatSMove(String hashTag, List<String> initialElements,
+ AtomicBoolean continueRunning) {
+ String source = hashTag + KEY_1;
+ String destination = hashTag + KEY_2;
+
+ while (continueRunning.get()) {
+ for (int i = 0; i < initialElements.size(); i++) {
+ int movedIndex = (initialElements.size() - 1) - i;
+ assertThat(jedis.smove(source, destination, initialElements.get(movedIndex))).isEqualTo(1);
+
+ // Confirm we moved the correct element
+ assertThat(jedis.smembers(source))
+ .containsExactlyInAnyOrderElementsOf(initialElements.subList(0, movedIndex));
+ assertThat(jedis.smembers(destination)).containsExactlyInAnyOrderElementsOf(
+ initialElements.subList(movedIndex, initialElements.size()));
+ }
+
+ // All elements have been moved
+ assertThat(jedis.exists(source)).isFalse();
+
+ // Swap the source and destination keys
+ String tmp = source;
+ source = destination;
+ destination = tmp;
+ }
+ }
+
+ private void repeatSMoveWithSameSourceAndDest(String hashTag, List<String> initialElements,
+ AtomicBoolean continueRunning) {
+ String key = hashTag + KEY_1;
+
+ while (continueRunning.get()) {
+ for (String element : initialElements) {
+ assertThat(jedis.smove(key, key, element)).isEqualTo(1);
+ Collections.rotate(initialElements, 1);
+ assertThat(jedis.smembers(key)).containsExactlyInAnyOrderElementsOf(initialElements);
+ }
+ }
+ }
+
+ private List<String> getHashTagsForEachServer() {
+ List<String> hashTags = new ArrayList<>();
+ hashTags.add("{" + clusterStartUp.getKeyOnServer("tag", 1) + "}");
+ hashTags.add("{" + clusterStartUp.getKeyOnServer("tag", 2) + "}");
+ hashTags.add("{" + clusterStartUp.getKeyOnServer("tag", 3) + "}");
+ return hashTags;
+ }
+
+ private List<String> makeMemberList(int setSize, String baseString) {
+ List<String> members = new ArrayList<>();
+ for (int i = 0; i < setSize; i++) {
+ members.add(baseString + i);
+ }
+ return members;
+ }
+}
diff --git a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/string/MSetDUnitTest.java b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/string/MSetDUnitTest.java
index 44394cea7d..588b29149b 100644
--- a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/string/MSetDUnitTest.java
+++ b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/string/MSetDUnitTest.java
@@ -15,11 +15,16 @@
package org.apache.geode.redis.internal.commands.executor.string;
+import static org.apache.geode.redis.internal.RedisConstants.SERVER_ERROR_MESSAGE;
import static org.apache.geode.redis.internal.SystemPropertyBasedRedisConfiguration.GEODE_FOR_REDIS_PORT;
+import static org.apache.geode.redis.internal.services.RegionProvider.DEFAULT_REDIS_REGION_NAME;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,11 +39,18 @@ import org.junit.Test;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.control.RebalanceFactory;
import org.apache.geode.cache.control.ResourceManager;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
@@ -54,6 +66,8 @@ public class MSetDUnitTest {
@ClassRule
public static ExecutorServiceRule executor = new ExecutorServiceRule();
+ public static final String THROWING_CACHE_WRITER_EXCEPTION = "to be ignored";
+
private static final String HASHTAG = "{tag}";
private static JedisCluster jedis;
private static MemberVM server1;
@@ -168,6 +182,85 @@ public class MSetDUnitTest {
}
}
+ @Test
+ public void mset_isTransactional() {
+ IgnoredException.addIgnoredException(THROWING_CACHE_WRITER_EXCEPTION);
+ String hashTag = "{" + clusterStartUp.getKeyOnServer("tag", 1) + "}";
+
+ String key1 = hashTag + "key1";
+ String value1 = "value1";
+ jedis.set(key1, value1);
+
+ String listKey = hashTag + "listKey";
+ jedis.lpush(listKey, "1", "2", "3");
+
+ String nonExistent = hashTag + "nonExistentKey";
+
+ String throwingKey = hashTag + "ThrowingRedisString";
+ String throwingKeyValue = "ThrowingRedisStringValue";
+
+ jedis.set(throwingKey, throwingKeyValue);
+
+ // Install a cache writer that will throw an exception if a key with a name equal to throwingKey
+ // is updated or created
+ clusterStartUp.getMember(1).invoke(() -> {
+ RedisClusterStartupRule.getCache()
+ .<RedisKey, RedisData>getRegion(DEFAULT_REDIS_REGION_NAME)
+ .getAttributesMutator()
+ .setCacheWriter(new ThrowingCacheWriter(throwingKey));
+ });
+
+ String newValue = "should_not_be_set";
+
+ assertThatThrownBy(
+ () -> jedis.mset(key1, newValue, nonExistent, newValue, throwingKey, newValue))
+ .hasMessage(SERVER_ERROR_MESSAGE);
+
+ assertThat(jedis.get(key1)).isEqualTo(value1);
+ assertThat(jedis.type(listKey)).isEqualTo("list");
+ assertThat(jedis.exists(nonExistent)).isFalse();
+ assertThat(jedis.get(throwingKey)).isEqualTo(throwingKeyValue);
+
+ IgnoredException.removeAllExpectedExceptions();
+ }
+
+ private static class ThrowingCacheWriter implements CacheWriter<RedisKey, RedisData> {
+ private final byte[] keyBytes;
+
+ ThrowingCacheWriter(String key) {
+ keyBytes = key.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Override
+ public void beforeUpdate(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+ if (Arrays.equals(event.getKey().toBytes(), keyBytes)) {
+ throw new CacheWriterException(THROWING_CACHE_WRITER_EXCEPTION);
+ }
+ }
+
+ @Override
+ public void beforeCreate(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+ if (Arrays.equals(event.getKey().toBytes(), keyBytes)) {
+ throw new CacheWriterException(THROWING_CACHE_WRITER_EXCEPTION);
+ }
+ }
+
+ @Override
+ public void beforeDestroy(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+
+ }
+
+ @Override
+ public void beforeRegionDestroy(RegionEvent<RedisKey, RedisData> event)
+ throws CacheWriterException {
+
+ }
+
+ @Override
+ public void beforeRegionClear(RegionEvent<RedisKey, RedisData> event)
+ throws CacheWriterException {}
+ }
+
private String[] makeKeysAndValues(String[] keys, String valueBase) {
String[] keysValues = new String[keys.length * 2];
for (int i = 0; i < keys.length * 2; i += 2) {
diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractRPopLPushIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractRPopLPushIntegrationTest.java
index 6cd3ded04d..3788ed4cdb 100644
--- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractRPopLPushIntegrationTest.java
+++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractRPopLPushIntegrationTest.java
@@ -94,6 +94,12 @@ public abstract class AbstractRPopLPushIntegrationTest implements RedisIntegrati
.hasMessage(ERROR_WRONG_TYPE);
}
+ @Test
+ public void rPopLPush_withNonexistentSourceKeyAndNonListDestinationKey_returnsNull() {
+ jedis.set(DESTINATION_KEY, "not_a_list");
+ assertThat(jedis.rpoplpush(SOURCE_KEY, DESTINATION_KEY)).isNull();
+ }
+
@Test
public void rPopLPush_withNonexistentSourceKey_returnsNull() {
assertThat(jedis.rpoplpush(SOURCE_KEY, DESTINATION_KEY)).isNull();
diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSMoveIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSMoveIntegrationTest.java
index 2c3841377c..8361881189 100755
--- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSMoveIntegrationTest.java
+++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSMoveIntegrationTest.java
@@ -87,6 +87,26 @@ public abstract class AbstractSMoveIntegrationTest implements RedisIntegrationTe
.hasMessage(ERROR_WRONG_TYPE);
}
+ @Test
+ public void smove_withNonExistentSourceAndWrongTypeDestination_returnsZero() {
+ jedis.set(DESTINATION_KEY, "not a RedisSet");
+
+ assertThat(jedis.smove(NON_EXISTENT_SET_KEY, DESTINATION_KEY, MOVED_MEMBER))
+ .isEqualTo(0);
+ }
+
+ @Test
+ public void smove_withNonExistentMemberInSourceAndDestinationNotASet_returnsWrongTypeError() {
+ String nonExistentMember = "foo";
+ jedis.sadd(SOURCE_KEY, SOURCE_MEMBERS);
+ jedis.set(DESTINATION_KEY, "not a set");
+
+ assertThat(jedis.smove(NON_EXISTENT_SET_KEY, DESTINATION_KEY, nonExistentMember))
+ .isEqualTo(0);
+ assertThatThrownBy(() -> jedis.sismember(DESTINATION_KEY, nonExistentMember))
+ .hasMessage(ERROR_WRONG_TYPE);
+ }
+
@Test
public void smove_withNonExistentSource_returnsZero_sourceKeyDoesNotExist() {
jedis.sadd(DESTINATION_KEY, DESTINATION_MEMBERS);
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/string/SetExecutor.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/string/SetExecutor.java
index c017b3bfd3..75b55e4cf4 100755
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/string/SetExecutor.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/string/SetExecutor.java
@@ -215,8 +215,9 @@ public class SetExecutor implements CommandExecutor {
SetOptions options) {
RedisString redisString;
RedisData redisData = regionProvider.getRedisData(key);
+ boolean isTransaction = redisData.txActive(regionProvider.getDataRegion());
- if (redisData.isNull() || redisData.getType() != REDIS_STRING) {
+ if (redisData.isNull() || redisData.getType() != REDIS_STRING || isTransaction) {
redisString = new RedisString(value);
redisString.handleSetExpiration(options);
regionProvider.getDataRegion().put(key, redisString);
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
index 96f2497d62..e2532ae885 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
@@ -420,7 +420,8 @@ public abstract class AbstractRedisData implements RedisData {
}
}
- private boolean txActive(Region<RedisKey, RedisData> region) {
+ @Override
+ public boolean txActive(Region<RedisKey, RedisData> region) {
TXId txId;
if (region instanceof LocalDataSet) {
txId = ((LocalDataSet) region).getProxy().getTXId();
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java
index 253b10e269..f2e447bbec 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java
@@ -97,6 +97,11 @@ public class NullRedisData implements RedisData {
return "none";
}
+ @Override
+ public boolean txActive(Region<RedisKey, RedisData> region) {
+ return false;
+ }
+
@Override
public boolean rename(ExecutionHandlerContext context, RedisKey oldKey, RedisKey newKey,
boolean ifNotExists) {
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
index 673c47e2f6..c09a6ff0e4 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
@@ -70,6 +70,8 @@ public interface RedisData extends Delta, DataSerializable, Sizeable {
String type();
+ boolean txActive(Region<RedisKey, RedisData> region);
+
boolean rename(ExecutionHandlerContext context, RedisKey oldKey, RedisKey newKey,
boolean ifTargetNotExists);
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
index 4dc66335ad..14d283dfe6 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
@@ -68,6 +68,13 @@ public class RedisList extends AbstractRedisData {
this.elementList = new SizeableByteArrayList();
}
+ public RedisList(RedisList redisList) {
+ setExpirationTimestampNoDelta(redisList.getExpirationTimestamp());
+ setVersion(redisList.getVersion());
+ elementList = new SizeableByteArrayList();
+ elementList.addAll(redisList.elementList);
+ }
+
public static List<byte[]> blpop(ExecutionHandlerContext context, Command command,
List<RedisKey> keys, double timeoutSeconds) {
RegionProvider regionProvider = context.getRegionProvider();
@@ -365,11 +372,21 @@ public class RedisList extends AbstractRedisData {
RedisKey destination) {
RegionProvider regionProvider = context.getRegionProvider();
RedisList sourceList = regionProvider.getTypedRedisData(REDIS_LIST, source, false);
- RedisList destinationList = regionProvider.getTypedRedisData(REDIS_LIST, destination, false);
+
+ if (sourceList.isNull()) {
+ return null;
+ }
+
+ RedisList newSourceList = new RedisList(sourceList);
Region<RedisKey, RedisData> region = regionProvider.getDataRegion();
- byte[] moved = sourceList.rpop(region, source);
- if (moved != null) {
- destinationList.lpush(context, Collections.singletonList(moved), destination, false);
+ byte[] moved = newSourceList.rpop(region, source);
+
+ if (source.equals(destination)) {
+ newSourceList.lpush(context, Collections.singletonList(moved), destination, false);
+ } else {
+ RedisList destinationList = regionProvider.getTypedRedisData(REDIS_LIST, destination, false);
+ new RedisList(destinationList).lpush(context, Collections.singletonList(moved), destination,
+ false);
}
return moved;
}
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
index 17a8b8a66f..1db2d2aaac 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
@@ -73,6 +73,13 @@ public class RedisSet extends AbstractRedisData {
this.members = members;
}
+ public RedisSet(RedisSet redisSet) {
+ setExpirationTimestampNoDelta(redisSet.getExpirationTimestamp());
+ setVersion(redisSet.getVersion());
+ members = new MemberSet(redisSet.members.size());
+ members.addAll(redisSet.members);
+ }
+
public RedisSet(int expectedSize) {
members = new MemberSet(expectedSize);
}
@@ -85,14 +92,30 @@ public class RedisSet extends AbstractRedisData {
public static int smove(RedisKey sourceKey, RedisKey destKey, byte[] member,
RegionProvider regionProvider) {
RedisSet source = regionProvider.getTypedRedisData(REDIS_SET, sourceKey, false);
+
+ if (source.isNull()) {
+ return 0;
+ }
+
+ if (sourceKey.equals(destKey)) {
+ if (source.sismember(member)) {
+ return 1;
+ } else {
+ return 0;
+ }
+ }
+
RedisSet destination = regionProvider.getTypedRedisData(REDIS_SET, destKey, false);
List<byte[]> memberList = new ArrayList<>();
memberList.add(member);
- if (source.srem(memberList, regionProvider.getDataRegion(), sourceKey) == 0) {
+ RedisSet newSource = new RedisSet(source);
+ if (newSource.srem(memberList, regionProvider.getDataRegion(), sourceKey) == 0) {
return 0;
+ } else {
+ RedisSet newDestination = new RedisSet(destination);
+ newDestination.sadd(memberList, regionProvider.getDataRegion(), destKey);
+ return 1;
}
- destination.sadd(memberList, regionProvider.getDataRegion(), destKey);
- return 1;
}
public static MemberSet sunion(RegionProvider regionProvider, List<RedisKey> keys,