You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ri...@apache.org on 2022/04/06 15:10:23 UTC

[geode] branch develop updated: GEODE-10121: Fix transactional redis commands (#7513)

This is an automated email from the ASF dual-hosted git repository.

ringles pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 141d43dc67 GEODE-10121: Fix transactional redis commands (#7513)
141d43dc67 is described below

commit 141d43dc67452af4940c24203719e6193be42300
Author: Bala Kaza Venkata <43...@users.noreply.github.com>
AuthorDate: Wed Apr 6 11:10:17 2022 -0400

    GEODE-10121: Fix transactional redis commands (#7513)
    
    * GEODE-10121: Fix transactional redis commands
    
    MSET, SMOVE commands are transactional but they have not been working as
    transactional. This commit will fix these commands to behave that way.
    
    Authored-by: Bala Kaza Venkata <bk...@vmware.com>
---
 .../commands/executor/list/RPopLPushDUnitTest.java | 121 +++++----
 .../commands/executor/set/SMoveDunitTest.java      | 276 +++++++++++++++++++++
 .../commands/executor/string/MSetDUnitTest.java    |  93 +++++++
 .../list/AbstractRPopLPushIntegrationTest.java     |   6 +
 .../executor/set/AbstractSMoveIntegrationTest.java |  20 ++
 .../commands/executor/string/SetExecutor.java      |   3 +-
 .../redis/internal/data/AbstractRedisData.java     |   3 +-
 .../geode/redis/internal/data/NullRedisData.java   |   5 +
 .../geode/redis/internal/data/RedisData.java       |   2 +
 .../geode/redis/internal/data/RedisList.java       |  25 +-
 .../apache/geode/redis/internal/data/RedisSet.java |  29 ++-
 11 files changed, 525 insertions(+), 58 deletions(-)

diff --git a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/RPopLPushDUnitTest.java b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/RPopLPushDUnitTest.java
index e038d2c226..fb338df31d 100644
--- a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/RPopLPushDUnitTest.java
+++ b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/list/RPopLPushDUnitTest.java
@@ -22,6 +22,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -32,19 +33,18 @@ import java.util.stream.IntStream;
 
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import redis.clients.jedis.HostAndPort;
 import redis.clients.jedis.JedisCluster;
 
-import org.apache.geode.cache.Region;
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.RegionEvent;
 import org.apache.geode.redis.internal.data.RedisData;
 import org.apache.geode.redis.internal.data.RedisKey;
-import org.apache.geode.redis.internal.data.RedisList;
-import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
 import org.apache.geode.test.dunit.IgnoredException;
-import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
@@ -52,7 +52,7 @@ import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 public class RPopLPushDUnitTest {
   public static final String KEY_1 = "key1";
   public static final String KEY_2 = "key2";
-  public static final String THROWING_REDIS_LIST_EXCEPTION = "to be ignored";
+  public static final String THROWING_CACHE_WRITER_EXCEPTION = "to be ignored";
 
   @Rule
   public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule();
@@ -142,51 +142,87 @@ public class RPopLPushDUnitTest {
     future3.get();
   }
 
-  @Ignore("GEODE-10121")
   @Test
   public void rpoplpush_isTransactional() {
-    String hashTag = "{" + clusterStartUp.getKeyOnServer("tag", 1) + "}";
+    IgnoredException.addIgnoredException(THROWING_CACHE_WRITER_EXCEPTION);
 
-    // Create two real RedisList entries
-    String sourceKey = hashTag + KEY_1;
-    String[] sourceElements = {"sourceElement1", "sourceElement2"};
-    jedis.lpush(sourceKey, sourceElements);
-    String destinationKey = hashTag + KEY_2;
-    String destinationElement = "destinationElement";
-    jedis.lpush(destinationKey, destinationElement);
+    int primaryVMIndex = 1;
+    final String tag = "{" + clusterStartUp.getKeyOnServer("tag", primaryVMIndex) + "}";
+
+    final String sourceKey = tag + KEY_1;
+    int sourceSize = 2;
+    List<String> initialElements = makeInitialElementsList(sourceSize);
+    jedis.lpush(sourceKey, initialElements.toArray(new String[0]));
 
-    String throwingRedisListKey = hashTag + "ThrowingRedisList";
-    String throwingListElement = "shouldNotMove";
+    String throwingKey = tag + "ThrowingRedisString";
+    int destinationSize = 2;
+    List<String> elementsForThrowingKey = makeInitialElementsList(sourceSize);
+    jedis.lpush(throwingKey, elementsForThrowingKey.toArray(new String[0]));
 
-    // Put a test version of RedisList directly into the region that throws if rpop() or lpush() are
-    // called on it
+    // Install a cache writer that will throw an exception if a key with a name equal to throwingKey
+    // is updated or created
     clusterStartUp.getMember(1).invoke(() -> {
-      RedisKey throwingKey = new RedisKey(throwingRedisListKey.getBytes(StandardCharsets.UTF_8));
-      ThrowingRedisList throwingRedisList = new ThrowingRedisList();
-      throwingRedisList.elementInsert(throwingListElement.getBytes(StandardCharsets.UTF_8), 0);
-      ClusterStartupRule.getCache().getRegion(DEFAULT_REDIS_REGION_NAME).put(throwingKey,
-          throwingRedisList);
+      RedisClusterStartupRule.getCache()
+          .<RedisKey, RedisData>getRegion(DEFAULT_REDIS_REGION_NAME)
+          .getAttributesMutator()
+          .setCacheWriter(new RPopLPushDUnitTest.ThrowingCacheWriter(throwingKey));
     });
 
-    IgnoredException.addIgnoredException(THROWING_REDIS_LIST_EXCEPTION);
+    assertThatThrownBy(
+        () -> jedis.rpoplpush(sourceKey, throwingKey))
+            .hasMessage(SERVER_ERROR_MESSAGE);
 
-    // Test with an exception being thrown from the source RedisList
-    assertThatThrownBy(() -> jedis.rpoplpush(throwingRedisListKey, destinationKey))
-        .hasMessage(SERVER_ERROR_MESSAGE);
-
-    assertThat(jedis.lrange(throwingRedisListKey, 0, -1)).containsExactly(throwingListElement);
-    assertThat(jedis.lrange(destinationKey, 0, -1)).containsExactly(destinationElement);
+    List<String> reversedInitialElements = new ArrayList<>(initialElements);
+    Collections.reverse(reversedInitialElements);
 
-    // Test with an exception being thrown from the destination RedisList
-    assertThatThrownBy(() -> jedis.rpoplpush(sourceKey, throwingRedisListKey))
-        .hasMessage(SERVER_ERROR_MESSAGE);
+    List<String> reversedElementsForThrowingKey = new ArrayList<>(elementsForThrowingKey);
+    Collections.reverse(reversedElementsForThrowingKey);
 
-    assertThat(jedis.lrange(sourceKey, 0, -1)).containsExactlyInAnyOrder(sourceElements);
-    assertThat(jedis.lrange(throwingRedisListKey, 0, -1)).containsExactly(throwingRedisListKey);
+    // Assert rpoplpush has not happened
+    assertThat(jedis.lrange(sourceKey, 0, -1)).containsExactlyElementsOf(reversedInitialElements);
+    assertThat(jedis.lrange(throwingKey, 0, -1))
+        .containsExactlyElementsOf(reversedElementsForThrowingKey);
 
     IgnoredException.removeAllExpectedExceptions();
   }
 
+  private static class ThrowingCacheWriter implements CacheWriter<RedisKey, RedisData> {
+    private final byte[] keyBytes;
+
+    ThrowingCacheWriter(String key) {
+      keyBytes = key.getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public void beforeUpdate(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+      if (Arrays.equals(event.getKey().toBytes(), keyBytes)) {
+        throw new CacheWriterException(THROWING_CACHE_WRITER_EXCEPTION);
+      }
+    }
+
+    @Override
+    public void beforeCreate(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+      if (Arrays.equals(event.getKey().toBytes(), keyBytes)) {
+        throw new CacheWriterException(THROWING_CACHE_WRITER_EXCEPTION);
+      }
+    }
+
+    @Override
+    public void beforeDestroy(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+
+    }
+
+    @Override
+    public void beforeRegionDestroy(RegionEvent<RedisKey, RedisData> event)
+        throws CacheWriterException {
+
+    }
+
+    @Override
+    public void beforeRegionClear(RegionEvent<RedisKey, RedisData> event)
+        throws CacheWriterException {}
+  }
+
   private List<String> getHashTagsForEachServer() {
     List<String> hashTags = new ArrayList<>();
     hashTags.add("{" + clusterStartUp.getKeyOnServer("tag", 1) + "}");
@@ -248,17 +284,4 @@ public class RPopLPushDUnitTest {
       }
     }
   }
-
-  static class ThrowingRedisList extends RedisList {
-    @Override
-    public long lpush(ExecutionHandlerContext context, List<byte[]> elementsToAdd, RedisKey key,
-        boolean onlyIfExists) {
-      throw new RuntimeException(THROWING_REDIS_LIST_EXCEPTION);
-    }
-
-    @Override
-    public byte[] rpop(Region<RedisKey, RedisData> region, RedisKey key) {
-      throw new RuntimeException(THROWING_REDIS_LIST_EXCEPTION);
-    }
-  }
 }
diff --git a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/set/SMoveDunitTest.java b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/set/SMoveDunitTest.java
new file mode 100644
index 0000000000..edc9e2d790
--- /dev/null
+++ b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/set/SMoveDunitTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.commands.executor.set;
+
+import static org.apache.geode.redis.internal.RedisConstants.SERVER_ERROR_MESSAGE;
+import static org.apache.geode.redis.internal.services.RegionProvider.DEFAULT_REDIS_REGION_NAME;
+import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.JedisCluster;
+
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.RegionEvent;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class SMoveDunitTest {
+
+  public static final String THROWING_CACHE_WRITER_EXCEPTION = "to be ignored";
+
+  @Rule
+  public RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule();
+
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  public static final String KEY_1 = "key1";
+  public static final String KEY_2 = "key2";
+  private static JedisCluster jedis;
+
+  @Before
+  public void testSetup() {
+    MemberVM locator = clusterStartUp.startLocatorVM(0);
+    clusterStartUp.startRedisVM(1, locator.getPort());
+    clusterStartUp.startRedisVM(2, locator.getPort());
+    clusterStartUp.startRedisVM(3, locator.getPort());
+    int redisServerPort = clusterStartUp.getRedisPort(1);
+    jedis = new JedisCluster(new HostAndPort(BIND_ADDRESS, redisServerPort), 20_000);
+    clusterStartUp.flushAll();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    jedis.close();
+  }
+
+
+  @Test
+  public void shouldDistributeDataAmongCluster_andRetainDataAfterServerCrash() {
+    int primaryVMIndex = 1;
+    final String tag = "{" + clusterStartUp.getKeyOnServer("tag", primaryVMIndex) + "}";
+    final String sourceKey = tag + KEY_1;
+    final String destinationKey = tag + KEY_2;
+
+
+    final int elementsToMove = 5;
+    final int initialElementCount = elementsToMove * 2;
+
+    List<String> members = makeMemberList(initialElementCount, "member1-");
+
+    jedis.sadd(sourceKey, members.toArray(new String[] {}));
+
+    // Move half the elements from the source set to the destination
+    for (int i = 0; i < elementsToMove; ++i) {
+      assertThat(jedis.smove(sourceKey, destinationKey, members.get(i))).isEqualTo(1);
+    }
+
+    clusterStartUp.crashVM(primaryVMIndex); // kill primary server
+
+    assertThat(jedis.smembers(sourceKey))
+        .containsExactlyInAnyOrderElementsOf(members.subList(elementsToMove, initialElementCount));
+    assertThat(jedis.smembers(destinationKey))
+        .containsExactlyInAnyOrderElementsOf(members.subList(0, elementsToMove));
+  }
+
+  @Test
+  public void givenBucketsMovedDuringSMove_thenOperationsAreNotLostOrDuplicated()
+      throws InterruptedException, ExecutionException {
+
+    final AtomicBoolean continueRunning = new AtomicBoolean(true);
+    final List<String> hashTags = getHashTagsForEachServer();
+
+    List<String> members1 = makeMemberList(10, "member1-");
+    List<String> members2 = makeMemberList(10, "member2-");
+    List<String> members3 = makeMemberList(10, "member3-");
+
+    jedis.sadd(hashTags.get(0) + KEY_1, members1.toArray(new String[] {}));
+    jedis.sadd(hashTags.get(1) + KEY_1, members2.toArray(new String[] {}));
+    jedis.sadd(hashTags.get(2) + KEY_1, members3.toArray(new String[] {}));
+
+    Future<Void> future1 = executor.runAsync(() -> repeatSMove(hashTags.get(0),
+        members1, continueRunning));
+    Future<Void> future2 = executor.runAsync(() -> repeatSMove(hashTags.get(1),
+        members2, continueRunning));
+    Future<Void> future3 =
+        executor.runAsync(() -> repeatSMoveWithSameSourceAndDest(hashTags.get(2),
+            members3, continueRunning));
+
+    for (int i = 0; i < 25 && continueRunning.get(); i++) {
+      clusterStartUp.moveBucketForKey(hashTags.get(i % hashTags.size()));
+      Thread.sleep(200);
+    }
+
+    continueRunning.set(false);
+
+    future1.get();
+    future2.get();
+    future3.get();
+  }
+
+  @Test
+  public void smove_isTransactional() {
+    IgnoredException.addIgnoredException(THROWING_CACHE_WRITER_EXCEPTION);
+
+    int primaryVMIndex = 1;
+    final String tag = "{" + clusterStartUp.getKeyOnServer("tag", primaryVMIndex) + "}";
+
+    final String sourceKey = tag + KEY_1;
+    int sourceSize = 2;
+    List<String> members = makeMemberList(sourceSize, "member1-");
+    jedis.sadd(sourceKey, members.toArray(new String[] {}));
+
+    String throwingKey = tag + "ThrowingRedisString";
+    int destinationSize = 2;
+    List<String> membersForThrowingKey = makeMemberList(2, "ThrowingRedisStringValue");
+    jedis.sadd(throwingKey, membersForThrowingKey.toArray(new String[] {}));
+
+    // Install a cache writer that will throw an exception if a key with a name equal to throwingKey
+    // is updated or created
+    clusterStartUp.getMember(1).invoke(() -> {
+      RedisClusterStartupRule.getCache()
+          .<RedisKey, RedisData>getRegion(DEFAULT_REDIS_REGION_NAME)
+          .getAttributesMutator()
+          .setCacheWriter(new SMoveDunitTest.ThrowingCacheWriter(throwingKey));
+    });
+
+    String memberToRemove = "member1-0";
+
+    assertThatThrownBy(
+        () -> jedis.smove(sourceKey, throwingKey, memberToRemove))
+            .hasMessage(SERVER_ERROR_MESSAGE);
+
+    // Assert smove has not happened
+    assertThat(jedis.smembers(sourceKey)).containsExactlyInAnyOrderElementsOf(members);
+    assertThat(jedis.smembers(throwingKey))
+        .containsExactlyInAnyOrderElementsOf(membersForThrowingKey);
+
+    IgnoredException.removeAllExpectedExceptions();
+  }
+
+  private static class ThrowingCacheWriter implements CacheWriter<RedisKey, RedisData> {
+    private final byte[] keyBytes;
+
+    ThrowingCacheWriter(String key) {
+      keyBytes = key.getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public void beforeUpdate(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+      if (Arrays.equals(event.getKey().toBytes(), keyBytes)) {
+        throw new CacheWriterException(THROWING_CACHE_WRITER_EXCEPTION);
+      }
+    }
+
+    @Override
+    public void beforeCreate(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+      if (Arrays.equals(event.getKey().toBytes(), keyBytes)) {
+        throw new CacheWriterException(THROWING_CACHE_WRITER_EXCEPTION);
+      }
+    }
+
+    @Override
+    public void beforeDestroy(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+
+    }
+
+    @Override
+    public void beforeRegionDestroy(RegionEvent<RedisKey, RedisData> event)
+        throws CacheWriterException {
+
+    }
+
+    @Override
+    public void beforeRegionClear(RegionEvent<RedisKey, RedisData> event)
+        throws CacheWriterException {}
+  }
+
+  private void repeatSMove(String hashTag, List<String> initialElements,
+      AtomicBoolean continueRunning) {
+    String source = hashTag + KEY_1;
+    String destination = hashTag + KEY_2;
+
+    while (continueRunning.get()) {
+      for (int i = 0; i < initialElements.size(); i++) {
+        int movedIndex = (initialElements.size() - 1) - i;
+        assertThat(jedis.smove(source, destination, initialElements.get(movedIndex))).isEqualTo(1);
+
+        // Confirm we moved the correct element
+        assertThat(jedis.smembers(source))
+            .containsExactlyInAnyOrderElementsOf(initialElements.subList(0, movedIndex));
+        assertThat(jedis.smembers(destination)).containsExactlyInAnyOrderElementsOf(
+            initialElements.subList(movedIndex, initialElements.size()));
+      }
+
+      // All elements have been moved
+      assertThat(jedis.exists(source)).isFalse();
+
+      // Swap the source and destination keys
+      String tmp = source;
+      source = destination;
+      destination = tmp;
+    }
+  }
+
+  private void repeatSMoveWithSameSourceAndDest(String hashTag, List<String> initialElements,
+      AtomicBoolean continueRunning) {
+    String key = hashTag + KEY_1;
+
+    while (continueRunning.get()) {
+      for (String element : initialElements) {
+        assertThat(jedis.smove(key, key, element)).isEqualTo(1);
+        Collections.rotate(initialElements, 1);
+        assertThat(jedis.smembers(key)).containsExactlyInAnyOrderElementsOf(initialElements);
+      }
+    }
+  }
+
+  private List<String> getHashTagsForEachServer() {
+    List<String> hashTags = new ArrayList<>();
+    hashTags.add("{" + clusterStartUp.getKeyOnServer("tag", 1) + "}");
+    hashTags.add("{" + clusterStartUp.getKeyOnServer("tag", 2) + "}");
+    hashTags.add("{" + clusterStartUp.getKeyOnServer("tag", 3) + "}");
+    return hashTags;
+  }
+
+  private List<String> makeMemberList(int setSize, String baseString) {
+    List<String> members = new ArrayList<>();
+    for (int i = 0; i < setSize; i++) {
+      members.add(baseString + i);
+    }
+    return members;
+  }
+}
diff --git a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/string/MSetDUnitTest.java b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/string/MSetDUnitTest.java
index 44394cea7d..588b29149b 100644
--- a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/string/MSetDUnitTest.java
+++ b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/string/MSetDUnitTest.java
@@ -15,11 +15,16 @@
 
 package org.apache.geode.redis.internal.commands.executor.string;
 
+import static org.apache.geode.redis.internal.RedisConstants.SERVER_ERROR_MESSAGE;
 import static org.apache.geode.redis.internal.SystemPropertyBasedRedisConfiguration.GEODE_FOR_REDIS_PORT;
+import static org.apache.geode.redis.internal.services.RegionProvider.DEFAULT_REDIS_REGION_NAME;
 import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
 import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,11 +39,18 @@ import org.junit.Test;
 import redis.clients.jedis.HostAndPort;
 import redis.clients.jedis.JedisCluster;
 
+import org.apache.geode.cache.CacheWriter;
+import org.apache.geode.cache.CacheWriterException;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.RegionEvent;
 import org.apache.geode.cache.control.RebalanceFactory;
 import org.apache.geode.cache.control.ResourceManager;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
+import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
@@ -54,6 +66,8 @@ public class MSetDUnitTest {
   @ClassRule
   public static ExecutorServiceRule executor = new ExecutorServiceRule();
 
+  public static final String THROWING_CACHE_WRITER_EXCEPTION = "to be ignored";
+
   private static final String HASHTAG = "{tag}";
   private static JedisCluster jedis;
   private static MemberVM server1;
@@ -168,6 +182,85 @@ public class MSetDUnitTest {
     }
   }
 
+  @Test
+  public void mset_isTransactional() {
+    IgnoredException.addIgnoredException(THROWING_CACHE_WRITER_EXCEPTION);
+    String hashTag = "{" + clusterStartUp.getKeyOnServer("tag", 1) + "}";
+
+    String key1 = hashTag + "key1";
+    String value1 = "value1";
+    jedis.set(key1, value1);
+
+    String listKey = hashTag + "listKey";
+    jedis.lpush(listKey, "1", "2", "3");
+
+    String nonExistent = hashTag + "nonExistentKey";
+
+    String throwingKey = hashTag + "ThrowingRedisString";
+    String throwingKeyValue = "ThrowingRedisStringValue";
+
+    jedis.set(throwingKey, throwingKeyValue);
+
+    // Install a cache writer that will throw an exception if a key with a name equal to throwingKey
+    // is updated or created
+    clusterStartUp.getMember(1).invoke(() -> {
+      RedisClusterStartupRule.getCache()
+          .<RedisKey, RedisData>getRegion(DEFAULT_REDIS_REGION_NAME)
+          .getAttributesMutator()
+          .setCacheWriter(new ThrowingCacheWriter(throwingKey));
+    });
+
+    String newValue = "should_not_be_set";
+
+    assertThatThrownBy(
+        () -> jedis.mset(key1, newValue, nonExistent, newValue, throwingKey, newValue))
+            .hasMessage(SERVER_ERROR_MESSAGE);
+
+    assertThat(jedis.get(key1)).isEqualTo(value1);
+    assertThat(jedis.type(listKey)).isEqualTo("list");
+    assertThat(jedis.exists(nonExistent)).isFalse();
+    assertThat(jedis.get(throwingKey)).isEqualTo(throwingKeyValue);
+
+    IgnoredException.removeAllExpectedExceptions();
+  }
+
+  private static class ThrowingCacheWriter implements CacheWriter<RedisKey, RedisData> {
+    private final byte[] keyBytes;
+
+    ThrowingCacheWriter(String key) {
+      keyBytes = key.getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Override
+    public void beforeUpdate(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+      if (Arrays.equals(event.getKey().toBytes(), keyBytes)) {
+        throw new CacheWriterException(THROWING_CACHE_WRITER_EXCEPTION);
+      }
+    }
+
+    @Override
+    public void beforeCreate(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+      if (Arrays.equals(event.getKey().toBytes(), keyBytes)) {
+        throw new CacheWriterException(THROWING_CACHE_WRITER_EXCEPTION);
+      }
+    }
+
+    @Override
+    public void beforeDestroy(EntryEvent<RedisKey, RedisData> event) throws CacheWriterException {
+
+    }
+
+    @Override
+    public void beforeRegionDestroy(RegionEvent<RedisKey, RedisData> event)
+        throws CacheWriterException {
+
+    }
+
+    @Override
+    public void beforeRegionClear(RegionEvent<RedisKey, RedisData> event)
+        throws CacheWriterException {}
+  }
+
   private String[] makeKeysAndValues(String[] keys, String valueBase) {
     String[] keysValues = new String[keys.length * 2];
     for (int i = 0; i < keys.length * 2; i += 2) {
diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractRPopLPushIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractRPopLPushIntegrationTest.java
index 6cd3ded04d..3788ed4cdb 100644
--- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractRPopLPushIntegrationTest.java
+++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/list/AbstractRPopLPushIntegrationTest.java
@@ -94,6 +94,12 @@ public abstract class AbstractRPopLPushIntegrationTest implements RedisIntegrati
         .hasMessage(ERROR_WRONG_TYPE);
   }
 
+  @Test
+  public void rPopLPush_withNonexistentSourceKeyAndNonListDestinationKey_returnsNull() {
+    jedis.set(DESTINATION_KEY, "not_a_list");
+    assertThat(jedis.rpoplpush(SOURCE_KEY, DESTINATION_KEY)).isNull();
+  }
+
   @Test
   public void rPopLPush_withNonexistentSourceKey_returnsNull() {
     assertThat(jedis.rpoplpush(SOURCE_KEY, DESTINATION_KEY)).isNull();
diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSMoveIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSMoveIntegrationTest.java
index 2c3841377c..8361881189 100755
--- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSMoveIntegrationTest.java
+++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSMoveIntegrationTest.java
@@ -87,6 +87,26 @@ public abstract class AbstractSMoveIntegrationTest implements RedisIntegrationTe
         .hasMessage(ERROR_WRONG_TYPE);
   }
 
+  @Test
+  public void smove_withNonExistentSourceAndWrongTypeDestination_returnsZero() {
+    jedis.set(DESTINATION_KEY, "not a RedisSet");
+
+    assertThat(jedis.smove(NON_EXISTENT_SET_KEY, DESTINATION_KEY, MOVED_MEMBER))
+        .isEqualTo(0);
+  }
+
+  @Test
+  public void smove_withNonExistentMemberInSourceAndDestinationNotASet_returnsWrongTypeError() {
+    String nonExistentMember = "foo";
+    jedis.sadd(SOURCE_KEY, SOURCE_MEMBERS);
+    jedis.set(DESTINATION_KEY, "not a set");
+
+    assertThat(jedis.smove(NON_EXISTENT_SET_KEY, DESTINATION_KEY, nonExistentMember))
+        .isEqualTo(0);
+    assertThatThrownBy(() -> jedis.sismember(DESTINATION_KEY, nonExistentMember))
+        .hasMessage(ERROR_WRONG_TYPE);
+  }
+
   @Test
   public void smove_withNonExistentSource_returnsZero_sourceKeyDoesNotExist() {
     jedis.sadd(DESTINATION_KEY, DESTINATION_MEMBERS);
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/string/SetExecutor.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/string/SetExecutor.java
index c017b3bfd3..75b55e4cf4 100755
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/string/SetExecutor.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/string/SetExecutor.java
@@ -215,8 +215,9 @@ public class SetExecutor implements CommandExecutor {
       SetOptions options) {
     RedisString redisString;
     RedisData redisData = regionProvider.getRedisData(key);
+    boolean isTransaction = redisData.txActive(regionProvider.getDataRegion());
 
-    if (redisData.isNull() || redisData.getType() != REDIS_STRING) {
+    if (redisData.isNull() || redisData.getType() != REDIS_STRING || isTransaction) {
       redisString = new RedisString(value);
       redisString.handleSetExpiration(options);
       regionProvider.getDataRegion().put(key, redisString);
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
index 96f2497d62..e2532ae885 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
@@ -420,7 +420,8 @@ public abstract class AbstractRedisData implements RedisData {
     }
   }
 
-  private boolean txActive(Region<RedisKey, RedisData> region) {
+  @Override
+  public boolean txActive(Region<RedisKey, RedisData> region) {
     TXId txId;
     if (region instanceof LocalDataSet) {
       txId = ((LocalDataSet) region).getProxy().getTXId();
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java
index 253b10e269..f2e447bbec 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java
@@ -97,6 +97,11 @@ public class NullRedisData implements RedisData {
     return "none";
   }
 
+  @Override
+  public boolean txActive(Region<RedisKey, RedisData> region) {
+    return false;
+  }
+
   @Override
   public boolean rename(ExecutionHandlerContext context, RedisKey oldKey, RedisKey newKey,
       boolean ifNotExists) {
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
index 673c47e2f6..c09a6ff0e4 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
@@ -70,6 +70,8 @@ public interface RedisData extends Delta, DataSerializable, Sizeable {
 
   String type();
 
+  boolean txActive(Region<RedisKey, RedisData> region);
+
   boolean rename(ExecutionHandlerContext context, RedisKey oldKey, RedisKey newKey,
       boolean ifTargetNotExists);
 
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
index 4dc66335ad..14d283dfe6 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisList.java
@@ -68,6 +68,13 @@ public class RedisList extends AbstractRedisData {
     this.elementList = new SizeableByteArrayList();
   }
 
+  public RedisList(RedisList redisList) {
+    setExpirationTimestampNoDelta(redisList.getExpirationTimestamp());
+    setVersion(redisList.getVersion());
+    elementList = new SizeableByteArrayList();
+    elementList.addAll(redisList.elementList);
+  }
+
   public static List<byte[]> blpop(ExecutionHandlerContext context, Command command,
       List<RedisKey> keys, double timeoutSeconds) {
     RegionProvider regionProvider = context.getRegionProvider();
@@ -365,11 +372,21 @@ public class RedisList extends AbstractRedisData {
       RedisKey destination) {
     RegionProvider regionProvider = context.getRegionProvider();
     RedisList sourceList = regionProvider.getTypedRedisData(REDIS_LIST, source, false);
-    RedisList destinationList = regionProvider.getTypedRedisData(REDIS_LIST, destination, false);
+
+    if (sourceList.isNull()) {
+      return null;
+    }
+
+    RedisList newSourceList = new RedisList(sourceList);
     Region<RedisKey, RedisData> region = regionProvider.getDataRegion();
-    byte[] moved = sourceList.rpop(region, source);
-    if (moved != null) {
-      destinationList.lpush(context, Collections.singletonList(moved), destination, false);
+    byte[] moved = newSourceList.rpop(region, source);
+
+    if (source.equals(destination)) {
+      newSourceList.lpush(context, Collections.singletonList(moved), destination, false);
+    } else {
+      RedisList destinationList = regionProvider.getTypedRedisData(REDIS_LIST, destination, false);
+      new RedisList(destinationList).lpush(context, Collections.singletonList(moved), destination,
+          false);
     }
     return moved;
   }
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
index 17a8b8a66f..1db2d2aaac 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
@@ -73,6 +73,13 @@ public class RedisSet extends AbstractRedisData {
     this.members = members;
   }
 
+  public RedisSet(RedisSet redisSet) {
+    setExpirationTimestampNoDelta(redisSet.getExpirationTimestamp());
+    setVersion(redisSet.getVersion());
+    members = new MemberSet(redisSet.members.size());
+    members.addAll(redisSet.members);
+  }
+
   public RedisSet(int expectedSize) {
     members = new MemberSet(expectedSize);
   }
@@ -85,14 +92,30 @@ public class RedisSet extends AbstractRedisData {
   public static int smove(RedisKey sourceKey, RedisKey destKey, byte[] member,
       RegionProvider regionProvider) {
     RedisSet source = regionProvider.getTypedRedisData(REDIS_SET, sourceKey, false);
+
+    if (source.isNull()) {
+      return 0;
+    }
+
+    if (sourceKey.equals(destKey)) {
+      if (source.sismember(member)) {
+        return 1;
+      } else {
+        return 0;
+      }
+    }
+
     RedisSet destination = regionProvider.getTypedRedisData(REDIS_SET, destKey, false);
     List<byte[]> memberList = new ArrayList<>();
     memberList.add(member);
-    if (source.srem(memberList, regionProvider.getDataRegion(), sourceKey) == 0) {
+    RedisSet newSource = new RedisSet(source);
+    if (newSource.srem(memberList, regionProvider.getDataRegion(), sourceKey) == 0) {
       return 0;
+    } else {
+      RedisSet newDestination = new RedisSet(destination);
+      newDestination.sadd(memberList, regionProvider.getDataRegion(), destKey);
+      return 1;
     }
-    destination.sadd(memberList, regionProvider.getDataRegion(), destKey);
-    return 1;
   }
 
   public static MemberSet sunion(RegionProvider regionProvider, List<RedisKey> keys,