You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2020/08/11 02:45:31 UTC

[geode] branch develop updated: Revert "GEODE-8393: change memberDeparted to disconnect the connection (#5431)" (#5441)

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new d19368f  Revert "GEODE-8393: change memberDeparted to disconnect the connection (#5431)" (#5441)
d19368f is described below

commit d19368f7eb676026d1f921eac4106ff465131cc0
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Mon Aug 10 19:44:44 2020 -0700

    Revert "GEODE-8393: change memberDeparted to disconnect the connection (#5431)" (#5441)
    
    This reverts commit 0a91484b05f1caffa8cc3a59cc7fc38abe4376ed.
---
 .../executor/CrashAndNoRepeatDUnitTest.java        | 137 ++++++++++++---------
 .../executor/hash/HashesAndCrashesDUnitTest.java   |  34 ++++-
 .../internal/executor/pubsub/PubSubDUnitTest.java  |  17 ++-
 .../geode/redis/session/SessionDUnitTest.java      |  57 ++++++---
 .../internal/netty/ExecutionHandlerContext.java    |  20 ++-
 5 files changed, 179 insertions(+), 86 deletions(-)

diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
index b3d7741..b24ee7e 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/CrashAndNoRepeatDUnitTest.java
@@ -26,25 +26,30 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.util.Properties;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
+import io.lettuce.core.ClientOptions;
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisCommandExecutionException;
+import io.lettuce.core.api.StatefulRedisConnection;
+import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.resource.ClientResources;
 import org.apache.logging.log4j.Logger;
+import org.junit.After;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
-import redis.clients.jedis.Jedis;
-import redis.clients.jedis.exceptions.JedisConnectionException;
 
 import org.apache.geode.cache.control.RebalanceFactory;
 import org.apache.geode.cache.control.RebalanceResults;
 import org.apache.geode.cache.control.ResourceManager;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.redis.session.springRedisTestApplication.config.DUnitSocketAddressResolver;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
@@ -68,9 +73,10 @@ public class CrashAndNoRepeatDUnitTest {
   private static MemberVM server3;
 
   private static int[] redisPorts;
-  private static final String LOCAL_HOST = "127.0.0.1";
-  private static final int JEDIS_TIMEOUT =
-      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  private RedisClient redisClient;
+  private StatefulRedisConnection<String, String> connection;
+  private RedisCommands<String, String> commands;
 
   @Rule
   public ExecutorServiceRule executor = new ExecutorServiceRule();
@@ -113,11 +119,36 @@ public class CrashAndNoRepeatDUnitTest {
 
   }
 
-  private synchronized Jedis connect(AtomicReference<Jedis> jedisRef) {
-    jedisRef.set(new Jedis(LOCAL_HOST, redisPorts[0], JEDIS_TIMEOUT));
-    return jedisRef.get();
+  @Before
+  public void before() {
+    String redisPort1 = "" + redisPorts[0];
+    String redisPort2 = "" + redisPorts[1];
+    String redisPort3 = "" + redisPorts[2];
+    // For now only tell the client about redisPort1.
+    // That server is never restarted so clients should
+    // never fail due to the server they are connected to failing.
+    DUnitSocketAddressResolver dnsResolver =
+        new DUnitSocketAddressResolver(new String[] {redisPort1});
+
+    ClientResources resources = ClientResources.builder()
+        .socketAddressResolver(dnsResolver)
+        .build();
+
+    redisClient = RedisClient.create(resources, "redis://localhost");
+    redisClient.setOptions(ClientOptions.builder()
+        .autoReconnect(true)
+        .build());
+    connection = redisClient.connect();
+    commands = connection.sync();
+  }
+
+  @After
+  public void after() {
+    connection.close();
+    redisClient.shutdown();
   }
 
+
   private MemberVM startRedisVM(int vmID, int redisPort) {
     int locatorPort = locator.getPort();
     return clusterStartUp.startServerVM(vmID,
@@ -135,10 +166,15 @@ public class CrashAndNoRepeatDUnitTest {
     AtomicBoolean running3 = new AtomicBoolean(true);
     AtomicBoolean running4 = new AtomicBoolean(false);
 
-    Runnable task1 = () -> appendPerformAndVerify(1, 20000, running1);
-    Runnable task2 = () -> appendPerformAndVerify(2, 20000, running2);
-    Runnable task3 = () -> appendPerformAndVerify(3, 20000, running3);
-    Runnable task4 = () -> appendPerformAndVerify(4, 1000, running4);
+    Runnable task1 = null;
+    Runnable task2 = null;
+    Runnable task3 = null;
+    Runnable task4 = null;
+
+    task1 = () -> appendPerformAndVerify(0, 20000, running1);
+    task2 = () -> appendPerformAndVerify(1, 20000, running2);
+    task3 = () -> appendPerformAndVerify(3, 20000, running3);
+    task4 = () -> appendPerformAndVerify(4, 1000, running4);
 
     Future<Void> future1 = executor.runAsync(task1);
     Future<Void> future2 = executor.runAsync(task2);
@@ -179,10 +215,15 @@ public class CrashAndNoRepeatDUnitTest {
     AtomicBoolean running3 = new AtomicBoolean(true);
     AtomicBoolean running4 = new AtomicBoolean(false);
 
-    Runnable task1 = () -> renamePerformAndVerify(1, 20000, running1);
-    Runnable task2 = () -> renamePerformAndVerify(2, 20000, running2);
-    Runnable task3 = () -> renamePerformAndVerify(3, 20000, running3);
-    Runnable task4 = () -> renamePerformAndVerify(4, 1000, running4);
+    Runnable task1 = null;
+    Runnable task2 = null;
+    Runnable task3 = null;
+    Runnable task4 = null;
+
+    task1 = () -> renamePerformAndVerify(0, 20000, running1);
+    task2 = () -> renamePerformAndVerify(1, 20000, running2);
+    task3 = () -> renamePerformAndVerify(3, 20000, running3);
+    task4 = () -> renamePerformAndVerify(4, 1000, running4);
 
     Future<Void> future1 = executor.runAsync(task1);
     Future<Void> future2 = executor.runAsync(task2);
@@ -219,8 +260,8 @@ public class CrashAndNoRepeatDUnitTest {
     while (true) {
       try {
         return supplier.get();
-      } catch (JedisConnectionException ex) {
-        if (!ex.getMessage().contains("Unexpected end of stream.")) {
+      } catch (RedisCommandExecutionException ex) {
+        if (!ex.getMessage().contains("memberDeparted")) {
           throw ex;
         }
       }
@@ -228,85 +269,67 @@ public class CrashAndNoRepeatDUnitTest {
   }
 
   private void renamePerformAndVerify(int index, int minimumIterations, AtomicBoolean isRunning) {
-    final AtomicReference<Jedis> jedisRef = new AtomicReference<>();
-    connect(jedisRef);
     String newKey = null;
     String baseKey = "rename-key-" + index;
-    jedisRef.get().set(baseKey + "-0", "value");
+    commands.set(baseKey + "-0", "value");
     int iterationCount = 0;
 
     while (iterationCount < minimumIterations || isRunning.get()) {
       String oldKey = baseKey + "-" + iterationCount;
       newKey = baseKey + "-" + (iterationCount + 1);
       try {
-        jedisRef.get().rename(oldKey, newKey);
+        commands.rename(oldKey, newKey);
         iterationCount += 1;
-      } catch (JedisConnectionException ex) {
-        if (ex.getMessage().contains("Unexpected end of stream.")) {
-          if (!doWithRetry(() -> connect(jedisRef).exists(oldKey))) {
+      } catch (RedisCommandExecutionException e) {
+        if (e.getMessage().contains("memberDeparted")) {
+          if (doWithRetry(() -> commands.exists(oldKey)) == 0) {
             iterationCount += 1;
           }
-        } else if (ex.getMessage().contains("no such key")) {
+        } else if (e.getMessage().contains("no such key")) {
           iterationCount += 1;
         } else {
-          throw ex;
+          throw e;
         }
       }
     }
 
-    assertThat(jedisRef.get().keys(baseKey + "-*").size()).isEqualTo(1);
-    assertThat(jedisRef.get().exists(newKey)).isTrue();
+    assertThat(commands.keys(baseKey + "-*").size()).isEqualTo(1);
+    assertThat(commands.exists(newKey)).isEqualTo(1);
 
     logger.info("--->>> RENAME test ran {} iterations", iterationCount);
-    jedisRef.get().disconnect();
   }
 
   private void appendPerformAndVerify(int index, int minimumIterations, AtomicBoolean isRunning) {
     String key = "append-key-" + index;
     int iterationCount = 0;
-    final AtomicReference<Jedis> jedisRef = new AtomicReference<>();
-    connect(jedisRef);
 
     while (iterationCount < minimumIterations || isRunning.get()) {
-      String appendString = "-" + key + "-" + iterationCount + "-";
+      String appendString = "" + iterationCount % 2;
       try {
-        jedisRef.get().append(key, appendString);
+        commands.append(key, appendString);
         iterationCount += 1;
-      } catch (JedisConnectionException ex) {
-        if (ex.getMessage().contains("Unexpected end of stream.")) {
-          if (!doWithRetry(() -> connect(jedisRef).get(key)).endsWith(appendString)) {
-            // give some time for the in-flight op to be done
-            try {
-              Thread.sleep(1000);
-            } catch (InterruptedException e) {
-              Thread.interrupted();
-            }
-          }
-          if (doWithRetry(() -> connect(jedisRef).get(key)).endsWith(appendString)) {
+      } catch (RedisCommandExecutionException e) {
+        if (e.getMessage().contains("memberDeparted")) {
+          if (doWithRetry(() -> commands.get(key)).endsWith(appendString)) {
             iterationCount += 1;
           }
         } else {
-          throw ex;
+          throw e;
         }
       }
     }
 
-    String storedString = jedisRef.get().get(key);
-    int idx = 0;
+    String storedString = commands.get(key);
     for (int i = 0; i < iterationCount; i++) {
-      String expectedValue = "-" + key + "-" + i + "-";
-      String foundValue = storedString.substring(idx, idx + expectedValue.length());
-      if (!expectedValue.equals(foundValue)) {
-        Assert.fail("unexpected " + foundValue + " at index " + i + " iterationCount="
-            + iterationCount + " in string "
+      String expectedValue = "" + i % 2;
+      if (!expectedValue.equals("" + storedString.charAt(i))) {
+        Assert.fail("unexpected " + storedString.charAt(i) + " at index " + i + " in string "
             + storedString);
         break;
       }
-      idx += expectedValue.length();
     }
 
     logger.info("--->>> APPEND test ran {} iterations", iterationCount);
-    jedisRef.get().disconnect();
   }
 
   private static void rebalanceAllRegions(MemberVM vm) {
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java
index f1e4be6..2613fc2 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HashesAndCrashesDUnitTest.java
@@ -28,6 +28,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import io.lettuce.core.ClientOptions;
 import io.lettuce.core.RedisClient;
@@ -240,6 +241,18 @@ public class HashesAndCrashesDUnitTest {
     future3.get();
   }
 
+  private <T> T doWithRetry(Supplier<T> supplier) {
+    while (true) {
+      try {
+        return supplier.get();
+      } catch (RedisCommandExecutionException ex) {
+        if (!ex.getMessage().contains("memberDeparted")) {
+          throw ex;
+        }
+      }
+    }
+  }
+
   private void hsetPerformAndVerify(int index, int minimumIterations, AtomicBoolean isRunning) {
     String key = "hset-key-" + index;
     int iterationCount = 0;
@@ -249,7 +262,12 @@ public class HashesAndCrashesDUnitTest {
       try {
         commands.hset(key, fieldName, "value-" + iterationCount);
         iterationCount += 1;
-      } catch (RedisCommandExecutionException ignore) {
+      } catch (RedisCommandExecutionException e) {
+        if (e.getMessage().contains("memberDeparted")) {
+          if (doWithRetry(() -> commands.hexists(key, fieldName))) {
+            iterationCount += 1;
+          }
+        }
       }
     }
 
@@ -271,7 +289,12 @@ public class HashesAndCrashesDUnitTest {
       try {
         commands.sadd(key, member);
         iterationCount += 1;
-      } catch (RedisCommandExecutionException ignore) {
+      } catch (RedisCommandExecutionException e) {
+        if (e.getMessage().contains("memberDeparted")) {
+          if (doWithRetry(() -> commands.sismember(key, member))) {
+            iterationCount += 1;
+          }
+        }
       }
     }
 
@@ -295,7 +318,12 @@ public class HashesAndCrashesDUnitTest {
       try {
         commands.set(key, key);
         iterationCount += 1;
-      } catch (RedisCommandExecutionException ignore) {
+      } catch (RedisCommandExecutionException e) {
+        if (e.getMessage().contains("memberDeparted")) {
+          if (doWithRetry(() -> commands.exists(key)) == 1) {
+            iterationCount += 1;
+          }
+        }
       }
     }
 
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
index 8ebce3f..51eb3b7 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
@@ -37,6 +37,7 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import redis.clients.jedis.Jedis;
+import redis.clients.jedis.exceptions.JedisException;
 
 import org.apache.geode.redis.MockSubscriber;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
@@ -177,7 +178,21 @@ public class PubSubDUnitTest {
 
     cluster.crashVM(2);
 
-    result = publisher1.publish(CHANNEL_NAME, "hello again");
+    // Depending on the timing of this call, it may catch a function error (due to member departed)
+    // and return 0 as a result. Regardless, it should NOT hang.
+    boolean published = false;
+    do {
+      try {
+        result = publisher1.publish(CHANNEL_NAME, "hello again");
+        published = true;
+      } catch (JedisException ex) {
+        if (ex.getMessage().contains("memberDeparted")) {
+          // retry
+        } else {
+          throw ex;
+        }
+      }
+    } while (!published);
     assertThat(result).isLessThanOrEqualTo(1);
 
     mockSubscriber1.unsubscribe(CHANNEL_NAME);
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java
index 447e1a8..7b0c250 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/SessionDUnitTest.java
@@ -135,15 +135,27 @@ public abstract class SessionDUnitTest {
 
   protected String createNewSessionWithNote(int sessionApp, String note) {
     HttpEntity<String> request = new HttpEntity<>(note);
+    boolean noteAdded = false;
     String sessionCookie = "";
-    HttpHeaders resultHeaders = new RestTemplate()
-        .postForEntity(
-            "http://localhost:" + ports.get(sessionApp)
-                + "/addSessionNote",
-            request,
-            String.class)
-        .getHeaders();
-    sessionCookie = resultHeaders.getFirst("Set-Cookie");
+    do {
+      try {
+        HttpHeaders resultHeaders = new RestTemplate()
+            .postForEntity(
+                "http://localhost:" + ports.get(sessionApp)
+                    + "/addSessionNote",
+                request,
+                String.class)
+            .getHeaders();
+        sessionCookie = resultHeaders.getFirst("Set-Cookie");
+        noteAdded = true;
+      } catch (HttpServerErrorException e) {
+        if (e.getMessage().contains("memberDeparted")) {
+          // retry
+        } else {
+          throw e;
+        }
+      }
+    } while (!noteAdded);
 
     return sessionCookie;
   }
@@ -181,12 +193,29 @@ public abstract class SessionDUnitTest {
     List<String> notes = new ArrayList<>();
     Collections.addAll(notes, getSessionNotes(sessionApp, sessionCookie));
     HttpEntity<String> request = new HttpEntity<>(note, requestHeaders);
-    new RestTemplate()
-        .postForEntity(
-            "http://localhost:" + ports.get(sessionApp) + "/addSessionNote",
-            request,
-            String.class)
-        .getHeaders();
+    boolean noteAdded = false;
+    do {
+      try {
+        new RestTemplate()
+            .postForEntity(
+                "http://localhost:" + ports.get(sessionApp) + "/addSessionNote",
+                request,
+                String.class)
+            .getHeaders();
+        noteAdded = true;
+      } catch (HttpServerErrorException e) {
+        if (e.getMessage().contains("memberDeparted")) {
+          List<String> updatedNotes = new ArrayList<>();
+          Collections.addAll(updatedNotes, getSessionNotes(sessionApp, sessionCookie));
+          if (notes.containsAll(updatedNotes)) {
+            noteAdded = true;
+          }
+          e.printStackTrace();
+        } else {
+          throw e;
+        }
+      }
+    } while (!noteAdded);
   }
 
   protected String getSessionId(String sessionCookie) {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 9eb06ed..dc34dc1 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -131,18 +131,15 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
    */
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-    RedisResponse exceptionResponse = getExceptionResponse(ctx, cause);
-    if (exceptionResponse != null) {
-      writeToChannel(exceptionResponse);
+    if (cause instanceof IOException) {
+      channelInactive(ctx);
+      return;
     }
+    writeToChannel(getExceptionResponse(ctx, cause));
   }
 
   private RedisResponse getExceptionResponse(ChannelHandlerContext ctx, Throwable cause) {
     RedisResponse response;
-    if (cause instanceof IOException) {
-      channelInactive(ctx);
-      return null;
-    }
 
     if (cause instanceof FunctionException
         && !(cause instanceof FunctionInvocationTargetException)) {
@@ -169,10 +166,11 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
       response = RedisResponse.error(cause.getMessage());
     } else if (cause instanceof FunctionInvocationTargetException) {
       // This indicates a member departed
-      logger.warn(
-          "Closing client connection because one of the servers doing this operation departed.");
-      channelInactive(ctx);
-      response = null;
+      String errorMsg = cause.getMessage();
+      if (!errorMsg.contains("memberDeparted")) {
+        errorMsg = "memberDeparted: " + errorMsg;
+      }
+      response = RedisResponse.error(errorMsg);
     } else {
       if (logger.isErrorEnabled()) {
         logger.error("GeodeRedisServer-Unexpected error handler for " + ctx.channel(), cause);