You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2022/02/02 21:40:36 UTC

[geode] branch support/1.15 updated: GEODE-9994 Make Redis RENAME atomic (#7328)

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.15 by this push:
     new bff8082  GEODE-9994 Make Redis RENAME atomic (#7328)
bff8082 is described below

commit bff808245ab59e71a13e393a6301af444393abc4
Author: Eric Zoerner <zo...@vmware.com>
AuthorDate: Wed Feb 2 13:34:19 2022 -0800

    GEODE-9994 Make Redis RENAME atomic (#7328)
    
    - change verification to allow for erroneous client retries
    - change error logging to warning level to prevent false negative test failure due to log
    
    (cherry picked from commit 6413c33d6b0050a1ebe7e88f9ea20295ab780cab)
---
 .../commands/executor/key/RenameDUnitTest.java     | 117 +++++++++++++++++----
 .../executor/key/AbstractRenameExecutor.java       |   4 +-
 2 files changed, 101 insertions(+), 20 deletions(-)

diff --git a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/key/RenameDUnitTest.java b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/key/RenameDUnitTest.java
index 73092a9..02dba47 100644
--- a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/key/RenameDUnitTest.java
+++ b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/key/RenameDUnitTest.java
@@ -15,6 +15,7 @@
 package org.apache.geode.redis.internal.commands.executor.key;
 
 
+import static org.apache.geode.distributed.ConfigurationProperties.GEODE_FOR_REDIS_PORT;
 import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
 import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -33,6 +34,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.logging.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -43,16 +45,22 @@ import redis.clients.jedis.HostAndPort;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisCluster;
 
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.redis.internal.RedisConstants;
 import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.services.locking.LockingStripedCoordinator;
 import org.apache.geode.redis.internal.services.locking.StripedCoordinator;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 public class RenameDUnitTest {
+  private static final Logger logger = LogService.getLogger();
 
   @ClassRule
   public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(3);
@@ -61,14 +69,23 @@ public class RenameDUnitTest {
   public ExecutorServiceRule executor = new ExecutorServiceRule();
 
   private static JedisCluster jedisCluster;
-  private static MemberVM locator;
+  private static MemberVM server1;
+  private static int locatorPort;
+  private static int server3Port;
 
   @BeforeClass
   public static void setup() {
-    locator = clusterStartUp.startLocatorVM(0);
-    clusterStartUp.startRedisVM(1, locator.getPort());
-    clusterStartUp.startRedisVM(2, locator.getPort());
-    clusterStartUp.startRedisVM(3, locator.getPort());
+    final MemberVM locator = clusterStartUp.startLocatorVM(0);
+    locatorPort = locator.getPort();
+    server1 = clusterStartUp.startRedisVM(1, locatorPort);
+    clusterStartUp.startRedisVM(2, locatorPort);
+
+    server3Port = AvailablePortHelper.getRandomAvailableTCPPort();
+    final String finalRedisPort = Integer.toString(server3Port);
+    final int finalLocatorPort = locatorPort;
+    clusterStartUp.startRedisVM(3, x -> x
+        .withProperty(GEODE_FOR_REDIS_PORT, finalRedisPort)
+        .withConnectionToLocator(finalLocatorPort));
 
     int redisServerPort1 = clusterStartUp.getRedisPort(1);
     jedisCluster =
@@ -211,6 +228,48 @@ public class RenameDUnitTest {
   }
 
   @Test
+  public void givenCrashDuringRename_thenDoesNotLeaveInconsistencies() throws Exception {
+    final AtomicBoolean running = new AtomicBoolean(true);
+
+    final List<String> hashtags = new ArrayList<>();
+    hashtags.add(clusterStartUp.getKeyOnServer("rename", 1));
+    hashtags.add(clusterStartUp.getKeyOnServer("rename", 2));
+    hashtags.add(clusterStartUp.getKeyOnServer("rename", 3));
+
+    final Runnable task1 = () -> renamePerformAndVerify(1, 10000, hashtags.get(0), running, true);
+    final Runnable task2 = () -> renamePerformAndVerify(2, 10000, hashtags.get(1), running, true);
+    final Runnable task3 = () -> renamePerformAndVerify(3, 10000, hashtags.get(2), running, true);
+
+    final Future<Void> future1 = executor.runAsync(task1);
+    final Future<Void> future2 = executor.runAsync(task2);
+    final Future<Void> future3 = executor.runAsync(task3);
+
+    final String finalRedisPort = Integer.toString(server3Port);
+    final int finalLocatorPort = locatorPort;
+    final Future<?> crasherFuture = executor.submit(() -> {
+      try {
+        for (int i = 0; i < 20 && running.get(); i++) {
+          clusterStartUp.moveBucketForKey(hashtags.get(0), "server-3");
+          // Sleep for a bit so that rename can execute
+          GeodeAwaitility.await().during(Duration.ofMillis(2000)).until(() -> true);
+          clusterStartUp.crashVM(3);
+          clusterStartUp.startRedisVM(3, x -> x
+              .withProperty(GEODE_FOR_REDIS_PORT, finalRedisPort)
+              .withConnectionToLocator(finalLocatorPort));
+          rebalanceAllRegions(server1);
+        }
+      } finally {
+        running.set(false);
+      }
+    });
+
+    future1.get();
+    future2.get();
+    future3.get();
+    crasherFuture.get();
+  }
+
+  @Test
   public void givenBucketsMoveDuringRename_thenDataIsNotLost() throws Exception {
     AtomicBoolean running = new AtomicBoolean(true);
 
@@ -219,9 +278,9 @@ public class RenameDUnitTest {
     hashtags.add(clusterStartUp.getKeyOnServer("rename", 2));
     hashtags.add(clusterStartUp.getKeyOnServer("rename", 3));
 
-    Runnable task1 = () -> renamePerformAndVerify(1, 10000, hashtags.get(0), running);
-    Runnable task2 = () -> renamePerformAndVerify(2, 10000, hashtags.get(1), running);
-    Runnable task3 = () -> renamePerformAndVerify(3, 10000, hashtags.get(2), running);
+    Runnable task1 = () -> renamePerformAndVerify(1, 10000, hashtags.get(0), running, false);
+    Runnable task2 = () -> renamePerformAndVerify(2, 10000, hashtags.get(1), running, false);
+    Runnable task3 = () -> renamePerformAndVerify(3, 10000, hashtags.get(2), running, false);
 
     Future<Void> future1 = executor.runAsync(task1);
     Future<Void> future2 = executor.runAsync(task2);
@@ -239,24 +298,33 @@ public class RenameDUnitTest {
     future3.get();
   }
 
-  private void renamePerformAndVerify(int index, int minimumIterations, String hashtag,
-      AtomicBoolean isRunning) {
-    String baseKey = "{" + hashtag + "}-key-" + index;
+  private void renamePerformAndVerify(final int index, final int minimumIterations,
+      final String hashtag, final AtomicBoolean isRunning, final boolean continueOnError) {
+    final String baseKey = "{" + hashtag + "}-key-" + index;
     jedisCluster.set(baseKey + "-0", "value");
     int iterationCount = 0;
 
     while (iterationCount < minimumIterations || isRunning.get()) {
-      String oldKey = baseKey + "-" + iterationCount;
-      String newKey = baseKey + "-" + (iterationCount + 1);
+      final String oldKey = baseKey + "-" + iterationCount;
+      final String newKey = baseKey + "-" + (iterationCount + 1);
+
+      // it's possible previous rename failed, so make sure oldKey exists
+      jedisCluster.setnx(oldKey, "value");
+
       try {
         jedisCluster.rename(oldKey, newKey);
-      } catch (Exception ex) {
-        isRunning.set(false);
-        throw new RuntimeException("Exception performing RENAME " + oldKey + " " + newKey, ex);
+      } catch (final Exception ex) {
+        if (continueOnError) {
+          logger.warn("Exception performing RENAME " + oldKey + " " + newKey, ex);
+        } else {
+          isRunning.set(false);
+          throw new RuntimeException("Exception performing RENAME " + oldKey + " " + newKey, ex);
+        }
       }
 
-      assertThat(jedisCluster.exists(newKey))
-          .as("key " + newKey + " should exist").isTrue();
+      // verify that renaming occurred as a unit of work / all or nothing
+      assertThat(jedisCluster.exists(newKey)).isEqualTo(!jedisCluster.exists(oldKey));
+
       iterationCount += 1;
     }
   }
@@ -268,4 +336,17 @@ public class RenameDUnitTest {
       throw new RuntimeException(e);
     }
   }
+
+  private static void rebalanceAllRegions(MemberVM vm) {
+    vm.invoke("Running rebalance", () -> {
+      final ResourceManager manager = ClusterStartupRule.getCache().getResourceManager();
+      final RebalanceFactory factory = manager.createRebalanceFactory();
+      try {
+        factory.start().getResults();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    });
+  }
+
 }
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/key/AbstractRenameExecutor.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/key/AbstractRenameExecutor.java
index d022e20..eaeb2d2 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/key/AbstractRenameExecutor.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/key/AbstractRenameExecutor.java
@@ -53,9 +53,9 @@ public abstract class AbstractRenameExecutor implements CommandExecutor {
 
   protected static boolean rename(ExecutionHandlerContext context, RedisKey oldKey, RedisKey newKey,
       boolean ifTargetNotExists) {
-    List<RedisKey> lockOrdering = Arrays.asList(oldKey, newKey);
+    final List<RedisKey> keysToLock = Arrays.asList(oldKey, newKey);
 
-    return context.lockedExecute(oldKey, lockOrdering,
+    return context.lockedExecuteInTransaction(oldKey, keysToLock,
         () -> context.getRedisData(oldKey)
             .rename(context.getRegion(), oldKey, newKey, ifTargetNotExists));
   }