You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2020/08/11 02:45:31 UTC
[geode] branch develop updated: Revert "GEODE-8393: change
memberDeparted to disconnect the connection (#5431)" (#5441)
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new d19368f Revert "GEODE-8393: change memberDeparted to disconnect the connection (#5431)" (#5441)
d19368f is described below
commit d19368f7eb676026d1f921eac4106ff465131cc0
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Mon Aug 10 19:44:44 2020 -0700
Revert "GEODE-8393: change memberDeparted to disconnect the connection (#5431)" (#5441)
This reverts commit 0a91484b05f1caffa8cc3a59cc7fc38abe4376ed.
---
.../executor/CrashAndNoRepeatDUnitTest.java | 137 ++++++++++++---------
.../executor/hash/HashesAndCrashesDUnitTest.java | 34 ++++-
.../internal/executor/pubsub/PubSubDUnitTest.java | 17 ++-
.../geode/redis/session/SessionDUnitTest.java | 57 ++++++---
.../internal/netty/ExecutionHandlerContext.java | 20 ++-
5 files changed, 179 insertions(+), 86 deletions(-)
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
index b3d7741..b24ee7e 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
@@ -26,25 +26,30 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
import org.apache.logging.log4j.Logger;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.exceptions.JedisConnectionException;
import org.apache.geode.cache.control.RebalanceFactory;
import org.apache.geode.cache.control.RebalanceResults;
import org.apache.geode.cache.control.ResourceManager;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
@@ -68,9 +73,10 @@ public class CrashAndNoRepeatDUnitTest {
private static MemberVM server3;
private static int[] redisPorts;
- private static final String LOCAL_HOST = "127.0.0.1";
- private static final int JEDIS_TIMEOUT =
- Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+ private RedisClient redisClient;
+ private StatefulRedisConnection<String, String> connection;
+ private RedisCommands<String, String> commands;
@Rule
public ExecutorServiceRule executor = new ExecutorServiceRule();
@@ -113,11 +119,36 @@ public class CrashAndNoRepeatDUnitTest {
}
- private synchronized Jedis connect(AtomicReference<Jedis> jedisRef) {
- jedisRef.set(new Jedis(LOCAL_HOST, redisPorts[0], JEDIS_TIMEOUT));
- return jedisRef.get();
+ @Before
+ public void before() {
+ String redisPort1 = "" + redisPorts[0];
+ String redisPort2 = "" + redisPorts[1];
+ String redisPort3 = "" + redisPorts[2];
+ // For now only tell the client about redisPort1.
+ // That server is never restarted so clients should
+ // never fail due to the server they are connected to failing.
+ DUnitSocketAddressResolver dnsResolver =
+ new DUnitSocketAddressResolver(new String[] {redisPort1});
+
+ ClientResources resources = ClientResources.builder()
+ .socketAddressResolver(dnsResolver)
+ .build();
+
+ redisClient = RedisClient.create(resources, "redis://localhost");
+ redisClient.setOptions(ClientOptions.builder()
+ .autoReconnect(true)
+ .build());
+ connection = redisClient.connect();
+ commands = connection.sync();
+ }
+
+ @After
+ public void after() {
+ connection.close();
+ redisClient.shutdown();
}
+
private MemberVM startRedisVM(int vmID, int redisPort) {
int locatorPort = locator.getPort();
return clusterStartUp.startServerVM(vmID,
@@ -135,10 +166,15 @@ public class CrashAndNoRepeatDUnitTest {
AtomicBoolean running3 = new AtomicBoolean(true);
AtomicBoolean running4 = new AtomicBoolean(false);
- Runnable task1 = () -> appendPerformAndVerify(1, 20000, running1);
- Runnable task2 = () -> appendPerformAndVerify(2, 20000, running2);
- Runnable task3 = () -> appendPerformAndVerify(3, 20000, running3);
- Runnable task4 = () -> appendPerformAndVerify(4, 1000, running4);
+ Runnable task1 = null;
+ Runnable task2 = null;
+ Runnable task3 = null;
+ Runnable task4 = null;
+
+ task1 = () -> appendPerformAndVerify(0, 20000, running1);
+ task2 = () -> appendPerformAndVerify(1, 20000, running2);
+ task3 = () -> appendPerformAndVerify(3, 20000, running3);
+ task4 = () -> appendPerformAndVerify(4, 1000, running4);
Future<Void> future1 = executor.runAsync(task1);
Future<Void> future2 = executor.runAsync(task2);
@@ -179,10 +215,15 @@ public class CrashAndNoRepeatDUnitTest {
AtomicBoolean running3 = new AtomicBoolean(true);
AtomicBoolean running4 = new AtomicBoolean(false);
- Runnable task1 = () -> renamePerformAndVerify(1, 20000, running1);
- Runnable task2 = () -> renamePerformAndVerify(2, 20000, running2);
- Runnable task3 = () -> renamePerformAndVerify(3, 20000, running3);
- Runnable task4 = () -> renamePerformAndVerify(4, 1000, running4);
+ Runnable task1 = null;
+ Runnable task2 = null;
+ Runnable task3 = null;
+ Runnable task4 = null;
+
+ task1 = () -> renamePerformAndVerify(0, 20000, running1);
+ task2 = () -> renamePerformAndVerify(1, 20000, running2);
+ task3 = () -> renamePerformAndVerify(3, 20000, running3);
+ task4 = () -> renamePerformAndVerify(4, 1000, running4);
Future<Void> future1 = executor.runAsync(task1);
Future<Void> future2 = executor.runAsync(task2);
@@ -219,8 +260,8 @@ public class CrashAndNoRepeatDUnitTest {
while (true) {
try {
return supplier.get();
- } catch (JedisConnectionException ex) {
- if (!ex.getMessage().contains("Unexpected end of stream.")) {
+ } catch (RedisCommandExecutionException ex) {
+ if (!ex.getMessage().contains("memberDeparted")) {
throw ex;
}
}
@@ -228,85 +269,67 @@ public class CrashAndNoRepeatDUnitTest {
}
private void renamePerformAndVerify(int index, int minimumIterations, AtomicBoolean isRunning) {
- final AtomicReference<Jedis> jedisRef = new AtomicReference<>();
- connect(jedisRef);
String newKey = null;
String baseKey = "rename-key-" + index;
- jedisRef.get().set(baseKey + "-0", "value");
+ commands.set(baseKey + "-0", "value");
int iterationCount = 0;
while (iterationCount < minimumIterations || isRunning.get()) {
String oldKey = baseKey + "-" + iterationCount;
newKey = baseKey + "-" + (iterationCount + 1);
try {
- jedisRef.get().rename(oldKey, newKey);
+ commands.rename(oldKey, newKey);
iterationCount += 1;
- } catch (JedisConnectionException ex) {
- if (ex.getMessage().contains("Unexpected end of stream.")) {
- if (!doWithRetry(() -> connect(jedisRef).exists(oldKey))) {
+ } catch (RedisCommandExecutionException e) {
+ if (e.getMessage().contains("memberDeparted")) {
+ if (doWithRetry(() -> commands.exists(oldKey)) == 0) {
iterationCount += 1;
}
- } else if (ex.getMessage().contains("no such key")) {
+ } else if (e.getMessage().contains("no such key")) {
iterationCount += 1;
} else {
- throw ex;
+ throw e;
}
}
}
- assertThat(jedisRef.get().keys(baseKey + "-*").size()).isEqualTo(1);
- assertThat(jedisRef.get().exists(newKey)).isTrue();
+ assertThat(commands.keys(baseKey + "-*").size()).isEqualTo(1);
+ assertThat(commands.exists(newKey)).isEqualTo(1);
logger.info("--->>> RENAME test ran {} iterations", iterationCount);
- jedisRef.get().disconnect();
}
private void appendPerformAndVerify(int index, int minimumIterations, AtomicBoolean isRunning) {
String key = "append-key-" + index;
int iterationCount = 0;
- final AtomicReference<Jedis> jedisRef = new AtomicReference<>();
- connect(jedisRef);
while (iterationCount < minimumIterations || isRunning.get()) {
- String appendString = "-" + key + "-" + iterationCount + "-";
+ String appendString = "" + iterationCount % 2;
try {
- jedisRef.get().append(key, appendString);
+ commands.append(key, appendString);
iterationCount += 1;
- } catch (JedisConnectionException ex) {
- if (ex.getMessage().contains("Unexpected end of stream.")) {
- if (!doWithRetry(() -> connect(jedisRef).get(key)).endsWith(appendString)) {
- // give some time for the in-flight op to be done
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- Thread.interrupted();
- }
- }
- if (doWithRetry(() -> connect(jedisRef).get(key)).endsWith(appendString)) {
+ } catch (RedisCommandExecutionException e) {
+ if (e.getMessage().contains("memberDeparted")) {
+ if (doWithRetry(() -> commands.get(key)).endsWith(appendString)) {
iterationCount += 1;
}
} else {
- throw ex;
+ throw e;
}
}
}
- String storedString = jedisRef.get().get(key);
- int idx = 0;
+ String storedString = commands.get(key);
for (int i = 0; i < iterationCount; i++) {
- String expectedValue = "-" + key + "-" + i + "-";
- String foundValue = storedString.substring(idx, idx + expectedValue.length());
- if (!expectedValue.equals(foundValue)) {
- Assert.fail("unexpected " + foundValue + " at index " + i + " iterationCount="
- + iterationCount + " in string "
+ String expectedValue = "" + i % 2;
+ if (!expectedValue.equals("" + storedString.charAt(i))) {
+ Assert.fail("unexpected " + storedString.charAt(i) + " at index " + i + " in string "
+ storedString);
break;
}
- idx += expectedValue.length();
}
logger.info("--->>> APPEND test ran {} iterations", iterationCount);
- jedisRef.get().disconnect();
}
private static void rebalanceAllRegions(MemberVM vm) {
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java
index f1e4be6..2613fc2 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisClient;
@@ -240,6 +241,18 @@ public class HashesAndCrashesDUnitTest {
future3.get();
}
+ private <T> T doWithRetry(Supplier<T> supplier) {
+ while (true) {
+ try {
+ return supplier.get();
+ } catch (RedisCommandExecutionException ex) {
+ if (!ex.getMessage().contains("memberDeparted")) {
+ throw ex;
+ }
+ }
+ }
+ }
+
private void hsetPerformAndVerify(int index, int minimumIterations, AtomicBoolean isRunning) {
String key = "hset-key-" + index;
int iterationCount = 0;
@@ -249,7 +262,12 @@ public class HashesAndCrashesDUnitTest {
try {
commands.hset(key, fieldName, "value-" + iterationCount);
iterationCount += 1;
- } catch (RedisCommandExecutionException ignore) {
+ } catch (RedisCommandExecutionException e) {
+ if (e.getMessage().contains("memberDeparted")) {
+ if (doWithRetry(() -> commands.hexists(key, fieldName))) {
+ iterationCount += 1;
+ }
+ }
}
}
@@ -271,7 +289,12 @@ public class HashesAndCrashesDUnitTest {
try {
commands.sadd(key, member);
iterationCount += 1;
- } catch (RedisCommandExecutionException ignore) {
+ } catch (RedisCommandExecutionException e) {
+ if (e.getMessage().contains("memberDeparted")) {
+ if (doWithRetry(() -> commands.sismember(key, member))) {
+ iterationCount += 1;
+ }
+ }
}
}
@@ -295,7 +318,12 @@ public class HashesAndCrashesDUnitTest {
try {
commands.set(key, key);
iterationCount += 1;
- } catch (RedisCommandExecutionException ignore) {
+ } catch (RedisCommandExecutionException e) {
+ if (e.getMessage().contains("memberDeparted")) {
+ if (doWithRetry(() -> commands.exists(key)) == 1) {
+ iterationCount += 1;
+ }
+ }
}
}
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
index 8ebce3f..51eb3b7 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
@@ -37,6 +37,7 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import redis.clients.jedis.Jedis;
+import redis.clients.jedis.exceptions.JedisException;
import org.apache.geode.redis.MockSubscriber;
import org.apache.geode.test.awaitility.GeodeAwaitility;
@@ -177,7 +178,21 @@ public class PubSubDUnitTest {
cluster.crashVM(2);
- result = publisher1.publish(CHANNEL_NAME, "hello again");
+ // Depending on the timing of this call, it may catch a function error (due to member departed)
+ // and return 0 as a result. Regardless, it should NOT hang.
+ boolean published = false;
+ do {
+ try {
+ result = publisher1.publish(CHANNEL_NAME, "hello again");
+ published = true;
+ } catch (JedisException ex) {
+ if (ex.getMessage().contains("memberDeparted")) {
+ // retry
+ } else {
+ throw ex;
+ }
+ }
+ } while (!published);
assertThat(result).isLessThanOrEqualTo(1);
mockSubscriber1.unsubscribe(CHANNEL_NAME);
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java
index 447e1a8..7b0c250 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java
@@ -135,15 +135,27 @@ public abstract class SessionDUnitTest {
protected String createNewSessionWithNote(int sessionApp, String note) {
HttpEntity<String> request = new HttpEntity<>(note);
+ boolean noteAdded = false;
String sessionCookie = "";
- HttpHeaders resultHeaders = new RestTemplate()
- .postForEntity(
- "http://localhost:" + ports.get(sessionApp)
- + "/addSessionNote",
- request,
- String.class)
- .getHeaders();
- sessionCookie = resultHeaders.getFirst("Set-Cookie");
+ do {
+ try {
+ HttpHeaders resultHeaders = new RestTemplate()
+ .postForEntity(
+ "http://localhost:" + ports.get(sessionApp)
+ + "/addSessionNote",
+ request,
+ String.class)
+ .getHeaders();
+ sessionCookie = resultHeaders.getFirst("Set-Cookie");
+ noteAdded = true;
+ } catch (HttpServerErrorException e) {
+ if (e.getMessage().contains("memberDeparted")) {
+ // retry
+ } else {
+ throw e;
+ }
+ }
+ } while (!noteAdded);
return sessionCookie;
}
@@ -181,12 +193,29 @@ public abstract class SessionDUnitTest {
List<String> notes = new ArrayList<>();
Collections.addAll(notes, getSessionNotes(sessionApp, sessionCookie));
HttpEntity<String> request = new HttpEntity<>(note, requestHeaders);
- new RestTemplate()
- .postForEntity(
- "http://localhost:" + ports.get(sessionApp) + "/addSessionNote",
- request,
- String.class)
- .getHeaders();
+ boolean noteAdded = false;
+ do {
+ try {
+ new RestTemplate()
+ .postForEntity(
+ "http://localhost:" + ports.get(sessionApp) + "/addSessionNote",
+ request,
+ String.class)
+ .getHeaders();
+ noteAdded = true;
+ } catch (HttpServerErrorException e) {
+ if (e.getMessage().contains("memberDeparted")) {
+ List<String> updatedNotes = new ArrayList<>();
+ Collections.addAll(updatedNotes, getSessionNotes(sessionApp, sessionCookie));
+ if (notes.containsAll(updatedNotes)) {
+ noteAdded = true;
+ }
+ e.printStackTrace();
+ } else {
+ throw e;
+ }
+ }
+ } while (!noteAdded);
}
protected String getSessionId(String sessionCookie) {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 9eb06ed..dc34dc1 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -131,18 +131,15 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- RedisResponse exceptionResponse = getExceptionResponse(ctx, cause);
- if (exceptionResponse != null) {
- writeToChannel(exceptionResponse);
+ if (cause instanceof IOException) {
+ channelInactive(ctx);
+ return;
}
+ writeToChannel(getExceptionResponse(ctx, cause));
}
private RedisResponse getExceptionResponse(ChannelHandlerContext ctx, Throwable cause) {
RedisResponse response;
- if (cause instanceof IOException) {
- channelInactive(ctx);
- return null;
- }
if (cause instanceof FunctionException
&& !(cause instanceof FunctionInvocationTargetException)) {
@@ -169,10 +166,11 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
response = RedisResponse.error(cause.getMessage());
} else if (cause instanceof FunctionInvocationTargetException) {
// This indicates a member departed
- logger.warn(
- "Closing client connection because one of the servers doing this operation departed.");
- channelInactive(ctx);
- response = null;
+ String errorMsg = cause.getMessage();
+ if (!errorMsg.contains("memberDeparted")) {
+ errorMsg = "memberDeparted: " + errorMsg;
+ }
+ response = RedisResponse.error(errorMsg);
} else {
if (logger.isErrorEnabled()) {
logger.error("GeodeRedisServer-Unexpected error handler for " + ctx.channel(), cause);