You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2022/02/02 21:40:36 UTC
[geode] branch support/1.15 updated: GEODE-9994 Make Redis RENAME atomic (#7328)
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.15 by this push:
new bff8082 GEODE-9994 Make Redis RENAME atomic (#7328)
bff8082 is described below
commit bff808245ab59e71a13e393a6301af444393abc4
Author: Eric Zoerner <zo...@vmware.com>
AuthorDate: Wed Feb 2 13:34:19 2022 -0800
GEODE-9994 Make Redis RENAME atomic (#7328)
- change verification to allow for erroneous client retries
- change error logging to warning level to prevent false negative test failure due to log
(cherry picked from commit 6413c33d6b0050a1ebe7e88f9ea20295ab780cab)
---
.../commands/executor/key/RenameDUnitTest.java | 117 +++++++++++++++++----
.../executor/key/AbstractRenameExecutor.java | 4 +-
2 files changed, 101 insertions(+), 20 deletions(-)
diff --git a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/key/RenameDUnitTest.java b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/key/RenameDUnitTest.java
index 73092a9..02dba47 100644
--- a/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/key/RenameDUnitTest.java
+++ b/geode-for-redis/src/distributedTest/java/org/apache/geode/redis/internal/commands/executor/key/RenameDUnitTest.java
@@ -15,6 +15,7 @@
package org.apache.geode.redis.internal.commands.executor.key;
+import static org.apache.geode.distributed.ConfigurationProperties.GEODE_FOR_REDIS_PORT;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.BIND_ADDRESS;
import static org.apache.geode.test.dunit.rules.RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT;
import static org.assertj.core.api.Assertions.assertThat;
@@ -33,6 +34,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.logging.log4j.Logger;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -43,16 +45,22 @@ import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
+import org.apache.geode.cache.control.RebalanceFactory;
+import org.apache.geode.cache.control.ResourceManager;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.redis.internal.RedisConstants;
import org.apache.geode.redis.internal.data.RedisKey;
import org.apache.geode.redis.internal.services.locking.LockingStripedCoordinator;
import org.apache.geode.redis.internal.services.locking.StripedCoordinator;
import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
public class RenameDUnitTest {
+ private static final Logger logger = LogService.getLogger();
@ClassRule
public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(3);
@@ -61,14 +69,23 @@ public class RenameDUnitTest {
public ExecutorServiceRule executor = new ExecutorServiceRule();
private static JedisCluster jedisCluster;
- private static MemberVM locator;
+ private static MemberVM server1;
+ private static int locatorPort;
+ private static int server3Port;
@BeforeClass
public static void setup() {
- locator = clusterStartUp.startLocatorVM(0);
- clusterStartUp.startRedisVM(1, locator.getPort());
- clusterStartUp.startRedisVM(2, locator.getPort());
- clusterStartUp.startRedisVM(3, locator.getPort());
+ final MemberVM locator = clusterStartUp.startLocatorVM(0);
+ locatorPort = locator.getPort();
+ server1 = clusterStartUp.startRedisVM(1, locatorPort);
+ clusterStartUp.startRedisVM(2, locatorPort);
+
+ server3Port = AvailablePortHelper.getRandomAvailableTCPPort();
+ final String finalRedisPort = Integer.toString(server3Port);
+ final int finalLocatorPort = locatorPort;
+ clusterStartUp.startRedisVM(3, x -> x
+ .withProperty(GEODE_FOR_REDIS_PORT, finalRedisPort)
+ .withConnectionToLocator(finalLocatorPort));
int redisServerPort1 = clusterStartUp.getRedisPort(1);
jedisCluster =
@@ -211,6 +228,48 @@ public class RenameDUnitTest {
}
@Test
+ public void givenCrashDuringRename_thenDoesNotLeaveInconsistencies() throws Exception {
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ final List<String> hashtags = new ArrayList<>();
+ hashtags.add(clusterStartUp.getKeyOnServer("rename", 1));
+ hashtags.add(clusterStartUp.getKeyOnServer("rename", 2));
+ hashtags.add(clusterStartUp.getKeyOnServer("rename", 3));
+
+ final Runnable task1 = () -> renamePerformAndVerify(1, 10000, hashtags.get(0), running, true);
+ final Runnable task2 = () -> renamePerformAndVerify(2, 10000, hashtags.get(1), running, true);
+ final Runnable task3 = () -> renamePerformAndVerify(3, 10000, hashtags.get(2), running, true);
+
+ final Future<Void> future1 = executor.runAsync(task1);
+ final Future<Void> future2 = executor.runAsync(task2);
+ final Future<Void> future3 = executor.runAsync(task3);
+
+ final String finalRedisPort = Integer.toString(server3Port);
+ final int finalLocatorPort = locatorPort;
+ final Future<?> crasherFuture = executor.submit(() -> {
+ try {
+ for (int i = 0; i < 20 && running.get(); i++) {
+ clusterStartUp.moveBucketForKey(hashtags.get(0), "server-3");
+ // Sleep for a bit so that rename can execute
+ GeodeAwaitility.await().during(Duration.ofMillis(2000)).until(() -> true);
+ clusterStartUp.crashVM(3);
+ clusterStartUp.startRedisVM(3, x -> x
+ .withProperty(GEODE_FOR_REDIS_PORT, finalRedisPort)
+ .withConnectionToLocator(finalLocatorPort));
+ rebalanceAllRegions(server1);
+ }
+ } finally {
+ running.set(false);
+ }
+ });
+
+ future1.get();
+ future2.get();
+ future3.get();
+ crasherFuture.get();
+ }
+
+ @Test
public void givenBucketsMoveDuringRename_thenDataIsNotLost() throws Exception {
AtomicBoolean running = new AtomicBoolean(true);
@@ -219,9 +278,9 @@ public class RenameDUnitTest {
hashtags.add(clusterStartUp.getKeyOnServer("rename", 2));
hashtags.add(clusterStartUp.getKeyOnServer("rename", 3));
- Runnable task1 = () -> renamePerformAndVerify(1, 10000, hashtags.get(0), running);
- Runnable task2 = () -> renamePerformAndVerify(2, 10000, hashtags.get(1), running);
- Runnable task3 = () -> renamePerformAndVerify(3, 10000, hashtags.get(2), running);
+ Runnable task1 = () -> renamePerformAndVerify(1, 10000, hashtags.get(0), running, false);
+ Runnable task2 = () -> renamePerformAndVerify(2, 10000, hashtags.get(1), running, false);
+ Runnable task3 = () -> renamePerformAndVerify(3, 10000, hashtags.get(2), running, false);
Future<Void> future1 = executor.runAsync(task1);
Future<Void> future2 = executor.runAsync(task2);
@@ -239,24 +298,33 @@ public class RenameDUnitTest {
future3.get();
}
- private void renamePerformAndVerify(int index, int minimumIterations, String hashtag,
- AtomicBoolean isRunning) {
- String baseKey = "{" + hashtag + "}-key-" + index;
+ private void renamePerformAndVerify(final int index, final int minimumIterations,
+ final String hashtag, final AtomicBoolean isRunning, final boolean continueOnError) {
+ final String baseKey = "{" + hashtag + "}-key-" + index;
jedisCluster.set(baseKey + "-0", "value");
int iterationCount = 0;
while (iterationCount < minimumIterations || isRunning.get()) {
- String oldKey = baseKey + "-" + iterationCount;
- String newKey = baseKey + "-" + (iterationCount + 1);
+ final String oldKey = baseKey + "-" + iterationCount;
+ final String newKey = baseKey + "-" + (iterationCount + 1);
+
+ // it's possible previous rename failed, so make sure oldKey exists
+ jedisCluster.setnx(oldKey, "value");
+
try {
jedisCluster.rename(oldKey, newKey);
- } catch (Exception ex) {
- isRunning.set(false);
- throw new RuntimeException("Exception performing RENAME " + oldKey + " " + newKey, ex);
+ } catch (final Exception ex) {
+ if (continueOnError) {
+ logger.warn("Exception performing RENAME " + oldKey + " " + newKey, ex);
+ } else {
+ isRunning.set(false);
+ throw new RuntimeException("Exception performing RENAME " + oldKey + " " + newKey, ex);
+ }
}
- assertThat(jedisCluster.exists(newKey))
- .as("key " + newKey + " should exist").isTrue();
+ // verify that renaming occurred as a unit of work / all or nothing
+ assertThat(jedisCluster.exists(newKey)).isEqualTo(!jedisCluster.exists(oldKey));
+
iterationCount += 1;
}
}
@@ -268,4 +336,17 @@ public class RenameDUnitTest {
throw new RuntimeException(e);
}
}
+
+ private static void rebalanceAllRegions(MemberVM vm) {
+ vm.invoke("Running rebalance", () -> {
+ final ResourceManager manager = ClusterStartupRule.getCache().getResourceManager();
+ final RebalanceFactory factory = manager.createRebalanceFactory();
+ try {
+ factory.start().getResults();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
}
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/key/AbstractRenameExecutor.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/key/AbstractRenameExecutor.java
index d022e20..eaeb2d2 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/key/AbstractRenameExecutor.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/key/AbstractRenameExecutor.java
@@ -53,9 +53,9 @@ public abstract class AbstractRenameExecutor implements CommandExecutor {
protected static boolean rename(ExecutionHandlerContext context, RedisKey oldKey, RedisKey newKey,
boolean ifTargetNotExists) {
- List<RedisKey> lockOrdering = Arrays.asList(oldKey, newKey);
+ final List<RedisKey> keysToLock = Arrays.asList(oldKey, newKey);
- return context.lockedExecute(oldKey, lockOrdering,
+ return context.lockedExecuteInTransaction(oldKey, keysToLock,
() -> context.getRedisData(oldKey)
.rename(context.getRegion(), oldKey, newKey, ifTargetNotExists));
}