You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2020/08/31 23:23:03 UTC
[geode] branch develop updated: GEODE-8333: Change Redis adapter
threading model - fixes pubsub issues (#5488)
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 27b8e47 GEODE-8333: Change Redis adapter threading model - fixes pubsub issues (#5488)
27b8e47 is described below
commit 27b8e47e9f6e433dc71bffc5420ebcb51f79797e
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Mon Aug 31 16:22:11 2020 -0700
GEODE-8333: Change Redis adapter threading model - fixes pubsub issues (#5488)
- Do not use Netty threads for the entire request lifecycle. Each
instance of ExecutionHandlerContext (essentially each client
connection) uses a command queue which is actioned by a single thread
taken from the Geode 'waiting pool'.
- Every SUBSCRIBEed client is moved to a separate EventGroupLoop so that
PUBLISHed messages are not sent back using the 'normal' Worker
EventLoopGroup. This avoids a deadlock issue where PUBLISHed messages
need to be sent using the same thread that the PUBLISH response needs
to happen on.
- Fix issues with PubSub where switching the EventLoopGroup may fail
(because a client has already closed the connection) thus resulting in
hanging PUBLISHers waiting for a Subscription to be marked as
'readyForPublish'.
- Consolidate various MockSubscriber classes
---
.../src/test/resources/expected-pom.xml | 6 +
.../gradle/plugins/DependencyConstraints.groovy | 1 +
geode-redis/build.gradle | 4 +
.../pubsub/PubSubNativeRedisAcceptanceTest.java | 5 +-
.../geode/redis/mocks/MockBinarySubscriber.java | 0
.../apache/geode/redis/mocks/MockSubscriber.java | 100 +++++++++--
.../org/apache/geode/redis/MockSubscriber.java | 62 -------
.../internal/executor/pubsub/PubSubDUnitTest.java | 196 +++++++++++++++------
.../geode/redis/session/RedisSessionDUnitTest.java | 45 +++++
.../pubsub/LettucePubSubIntegrationTest.java | 98 +++++++++++
.../executor/pubsub/PubSubIntegrationTest.java | 182 ++++++++++++++++++-
.../pubsub/SubscriptionsIntegrationTest.java | 6 +-
.../geode/redis/mocks/DummySubscription.java | 6 +
.../geode/redis/internal/GeodeRedisServer.java | 4 +-
.../redis/internal/executor/RedisResponse.java | 12 ++
.../executor/pubsub/PsubscribeExecutor.java | 46 +++--
.../internal/executor/pubsub/PublishExecutor.java | 9 +-
.../executor/pubsub/SubscribeExecutor.java | 37 +++-
.../apache/geode/redis/internal/netty/Command.java | 30 ++++
.../internal/netty/ExecutionHandlerContext.java | 175 +++++++++++-------
.../redis/internal/netty/NettyRedisServer.java | 11 +-
.../internal/pubsub/AbstractSubscription.java | 72 +++++---
.../redis/internal/pubsub/ChannelSubscription.java | 5 +-
.../redis/internal/pubsub/PatternSubscription.java | 5 +-
.../apache/geode/redis/internal/pubsub/PubSub.java | 8 +-
.../geode/redis/internal/pubsub/PubSubImpl.java | 6 +-
.../redis/internal/pubsub/SubscribeResult.java} | 42 ++---
.../geode/redis/internal/pubsub/Subscription.java | 7 +
.../geode/redis/internal/pubsub/Subscriptions.java | 23 ++-
.../redis/internal/pubsub/PubSubImplJUnitTest.java | 9 +-
.../internal/pubsub/SubscriptionsJUnitTest.java | 27 +--
31 files changed, 930 insertions(+), 309 deletions(-)
diff --git a/boms/geode-all-bom/src/test/resources/expected-pom.xml b/boms/geode-all-bom/src/test/resources/expected-pom.xml
index 9013d57..40e1920 100644
--- a/boms/geode-all-bom/src/test/resources/expected-pom.xml
+++ b/boms/geode-all-bom/src/test/resources/expected-pom.xml
@@ -548,6 +548,12 @@
<scope>compile</scope>
</dependency>
<dependency>
+ <groupId>io.lettuce</groupId>
+ <artifactId>lettuce-core</artifactId>
+ <version>5.2.1.RELEASE</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
<version>2.12.0</version>
diff --git a/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy b/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy
index 860dafe..37d2c2e 100644
--- a/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy
+++ b/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy
@@ -169,6 +169,7 @@ class DependencyConstraints implements Plugin<Project> {
api(group: 'org.testcontainers', name: 'testcontainers', version: '1.13.0')
api(group: 'pl.pragmatists', name: 'JUnitParams', version: '1.1.0')
api(group: 'redis.clients', name: 'jedis', version: '3.2.0')
+ api(group: 'io.lettuce', name: 'lettuce-core', version: '5.2.1.RELEASE')
api(group: 'xerces', name: 'xercesImpl', version: '2.12.0')
}
}
diff --git a/geode-redis/build.gradle b/geode-redis/build.gradle
index 267ea9d..0d9a3f8 100644
--- a/geode-redis/build.gradle
+++ b/geode-redis/build.gradle
@@ -45,11 +45,14 @@ dependencies {
commonTestImplementation(project(':geode-junit'))
commonTestImplementation(project(':geode-dunit'))
+ commonTestImplementation('redis.clients:jedis')
integrationTestImplementation(project(':geode-dunit'))
integrationTestImplementation(project(':geode-junit'))
integrationTestImplementation(sourceSets.commonTest.output)
integrationTestImplementation('redis.clients:jedis')
+ integrationTestImplementation('io.lettuce:lettuce-core')
+ integrationTestImplementation('org.apache.logging.log4j:log4j-core')
integrationTestRuntime(project(':geode-log4j'))
acceptanceTestImplementation(sourceSets.integrationTest.output)
@@ -65,6 +68,7 @@ dependencies {
}
acceptanceTestImplementation('org.springframework.boot:spring-boot-starter-data-redis')
acceptanceTestImplementation('org.springframework.session:spring-session-data-redis')
+ acceptanceTestImplementation('org.apache.logging.log4j:log4j-core')
distributedTestCompile('org.apache.logging.log4j:log4j-core')
distributedTestImplementation(project(':geode-dunit'))
diff --git a/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubNativeRedisAcceptanceTest.java b/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubNativeRedisAcceptanceTest.java
index ee3ac5e..ce184ae 100644
--- a/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubNativeRedisAcceptanceTest.java
+++ b/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubNativeRedisAcceptanceTest.java
@@ -42,8 +42,8 @@ public class PubSubNativeRedisAcceptanceTest extends PubSubIntegrationTest {
public static void setUp() {
redisContainer = new GenericContainer<>("redis:5.0.6").withExposedPorts(6379);
redisContainer.start();
- subscriber = new Jedis("localhost", redisContainer.getFirstMappedPort(), REDIS_CLIENT_TIMEOUT);
- publisher = new Jedis("localhost", redisContainer.getFirstMappedPort(), REDIS_CLIENT_TIMEOUT);
+ subscriber = new Jedis("localhost", redisContainer.getFirstMappedPort(), JEDIS_TIMEOUT);
+ publisher = new Jedis("localhost", redisContainer.getFirstMappedPort(), JEDIS_TIMEOUT);
}
@AfterClass
@@ -55,4 +55,5 @@ public class PubSubNativeRedisAcceptanceTest extends PubSubIntegrationTest {
public int getPort() {
return redisContainer.getFirstMappedPort();
}
+
}
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockBinarySubscriber.java b/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockBinarySubscriber.java
similarity index 100%
rename from geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockBinarySubscriber.java
rename to geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockBinarySubscriber.java
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java b/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
similarity index 54%
rename from geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
rename to geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
index e0ecb43..e55e4cb 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
+++ b/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
@@ -11,7 +11,6 @@
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
- *
*/
package org.apache.geode.redis.mocks;
@@ -20,33 +19,112 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import redis.clients.jedis.Client;
import redis.clients.jedis.JedisPubSub;
public class MockSubscriber extends JedisPubSub {
+
+ private final CountDownLatch subscriptionLatch;
+ private final CountDownLatch unsubscriptionLatch;
private final List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>());
private final List<String> receivedPMessages = Collections.synchronizedList(new ArrayList<>());
+ public final List<UnsubscribeInfo> unsubscribeInfos =
+ Collections.synchronizedList(new ArrayList<>());
+ public final List<UnsubscribeInfo> punsubscribeInfos =
+ Collections.synchronizedList(new ArrayList<>());
+ private String localSocketAddress;
+
+ public MockSubscriber() {
+ this(new CountDownLatch(1));
+ }
+
+ public MockSubscriber(CountDownLatch subscriptionLatch) {
+ this(subscriptionLatch, new CountDownLatch(1));
+ }
+
+ public MockSubscriber(CountDownLatch subscriptionLatch, CountDownLatch unsubscriptionLatch) {
+ this.subscriptionLatch = subscriptionLatch;
+ this.unsubscriptionLatch = unsubscriptionLatch;
+ }
+
+ @Override
+ public void proceed(Client client, String... channels) {
+ localSocketAddress = client.getSocket().getLocalSocketAddress().toString();
+ super.proceed(client, channels);
+ }
+
+ private void switchThreadName(String suffix) {
+ String threadName = Thread.currentThread().getName();
+ int suffixIndex = threadName.indexOf(" -- ");
+ if (suffixIndex >= 0) {
+ threadName = threadName.substring(0, suffixIndex);
+ }
+
+ threadName += " -- " + suffix + " [" + localSocketAddress + "]";
+ Thread.currentThread().setName(threadName);
+ }
public List<String> getReceivedMessages() {
- return new ArrayList<>(receivedMessages);
+ return receivedMessages;
}
public List<String> getReceivedPMessages() {
return new ArrayList<>(receivedPMessages);
}
- public final List<UnsubscribeInfo> unsubscribeInfos =
- Collections.synchronizedList(new ArrayList<>());
- public final List<UnsubscribeInfo> punsubscribeInfos =
- Collections.synchronizedList(new ArrayList<>());
+ @Override
+ public void onMessage(String channel, String message) {
+ switchThreadName(String.format("MESSAGE %s %s", channel, message));
+ receivedMessages.add(message);
+ }
+
+ @Override
+ public void onPMessage(String pattern, String channel, String message) {
+ switchThreadName(String.format("PMESSAGE %s %s %s", pattern, channel, message));
+ receivedPMessages.add(message);
+ }
+
+ @Override
+ public void onSubscribe(String channel, int subscribedChannels) {
+ switchThreadName(String.format("SUBSCRIBE %s", channel));
+ subscriptionLatch.countDown();
+ }
+
+ private static final int AWAIT_TIMEOUT_MILLIS = 30000;
+
+ public void awaitSubscribe(String channel) {
+ try {
+ if (!subscriptionLatch.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ throw new RuntimeException("awaitSubscribe timed out for channel: " + channel);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
+ switchThreadName(String.format("UNSUBSCRIBE %s %d", channel, subscribedChannels));
unsubscribeInfos.add(new UnsubscribeInfo(channel, subscribedChannels));
+ unsubscriptionLatch.countDown();
+ }
+
+ public void awaitUnsubscribe(String channel) {
+ try {
+ if (!unsubscriptionLatch.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ throw new RuntimeException("awaitUnsubscribe timed out for channel: " + channel);
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
+ switchThreadName(String.format("PUNSUBSCRIBE %s %d", pattern, subscribedChannels));
punsubscribeInfos.add(new UnsubscribeInfo(pattern, subscribedChannels));
}
@@ -86,14 +164,4 @@ public class MockSubscriber extends JedisPubSub {
}
}
-
- @Override
- public void onMessage(String channel, String message) {
- receivedMessages.add(message);
- }
-
- @Override
- public void onPMessage(String pattern, String channel, String message) {
- receivedPMessages.add(message);
- }
}
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/MockSubscriber.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/MockSubscriber.java
deleted file mode 100644
index 5ac9c38..0000000
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/MockSubscriber.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-
-package org.apache.geode.redis;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.logging.log4j.Logger;
-import redis.clients.jedis.Client;
-import redis.clients.jedis.JedisPubSub;
-
-import org.apache.geode.logging.internal.log4j.api.LogService;
-
-public class MockSubscriber extends JedisPubSub {
- private CountDownLatch latch;
- private List<String> receivedMessages = new ArrayList<String>();
- private Client client;
-
- private static final Logger logger = LogService.getLogger();
-
- public MockSubscriber(CountDownLatch latch) {
- this.latch = latch;
- }
-
- public List<String> getReceivedMessages() {
- return receivedMessages;
- }
-
- @Override
- public void onSubscribe(String channel, int subscribedChannels) {
- // logger.info("--->>> Received subscription for " + client.getSocket());
- latch.countDown();
- }
-
- @Override
- public void onMessage(String channel, String message) {
- receivedMessages.add(message);
- }
-
- @Override
- public void proceed(Client client, String... channels) {
- this.client = client;
- // logger.info("--->>> Before for " + client.getSocket());
- super.proceed(client, channels);
- // logger.info("--->>> After for " + client.getSocket());
- }
-}
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
index 52ed634..4ea912a 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
@@ -30,25 +30,35 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.config.Configurator;
import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
-import org.apache.geode.redis.MockSubscriber;
+import org.apache.geode.logging.internal.executors.LoggingThread;
+import org.apache.geode.logging.internal.log4j.api.FastLogger;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.mocks.MockSubscriber;
import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;
import org.apache.geode.test.junit.rules.GfshCommandRule;
+
public class PubSubDUnitTest {
public static final String CHANNEL_NAME = "salutations";
+ public static final int JEDIS_TIMEOUT = Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
@ClassRule
public static RedisClusterStartupRule cluster = new RedisClusterStartupRule(6);
@@ -92,6 +102,14 @@ public class PubSubDUnitTest {
server4 = cluster.startRedisVM(4, locator.getPort());
server5 = cluster.startServerVM(5, locator.getPort());
+ for (VM v : Host.getHost(0).getAllVMs()) {
+ v.invoke(() -> {
+ Logger logger = LogService.getLogger("org.apache.geode.redis");
+ Configurator.setAllLevels(logger.getName(), Level.getLevel("DEBUG"));
+ FastLogger.setDelegating(true);
+ });
+ }
+
redisServerPort1 = cluster.getRedisPort(1);
redisServerPort2 = cluster.getRedisPort(2);
redisServerPort3 = cluster.getRedisPort(3);
@@ -105,12 +123,6 @@ public class PubSubDUnitTest {
gfsh.connectAndVerify(locator);
}
- @Before
- public void testSetup() {
- subscriber1.flushAll();
- subscriber2.flushAll();
- }
-
@AfterClass
public static void tearDown() {
subscriber1.disconnect();
@@ -126,6 +138,70 @@ public class PubSubDUnitTest {
}
@Test
+ public void shouldNotHang_givenPublishingAndSubscribingSimultaneously() {
+ ArrayList<Thread> threads = new ArrayList<>();
+ AtomicLong publishCount = new AtomicLong();
+ Random random = new Random();
+ int SUBSCRIBER_COUNT = 5;
+ int CHANNEL_COUNT = 200;
+
+ for (int i = 0; i < CHANNEL_COUNT; i++) {
+ String channelName = "theBestChannel" + i;
+ Thread thread = new LoggingThread(channelName, () -> {
+ ArrayList<MockSubscriber> mockSubscribers = new ArrayList<>();
+ ArrayList<Jedis> clients = new ArrayList<>();
+ ArrayList<Future<Void>> subscribeFutures = new ArrayList<>();
+
+ for (int j = 0; j < SUBSCRIBER_COUNT; j++) {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+ mockSubscribers.add(mockSubscriber);
+ Jedis client = getConnection(random);
+ clients.add(client);
+
+ Future<Void> f = executor.submit(() -> {
+ client.subscribe(mockSubscriber, channelName);
+ });
+ subscribeFutures.add(f);
+ }
+
+ mockSubscribers.forEach(x -> x.awaitSubscribe(channelName));
+
+ Jedis localPublisher = getConnection(random);
+ long published = localPublisher.publish(channelName, "hi");
+ publishCount.getAndAdd(published);
+ localPublisher.close();
+
+ mockSubscribers.forEach(s -> {
+ s.unsubscribe(channelName);
+ s.awaitUnsubscribe(channelName);
+ });
+
+ subscribeFutures.forEach(x -> {
+ try {
+ x.get(JEDIS_TIMEOUT, TimeUnit.MILLISECONDS);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ clients.forEach(Jedis::close);
+ });
+
+ threads.add(thread);
+ thread.start();
+ }
+
+ threads.forEach(thread -> {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ assertThat(publishCount.get()).isEqualTo(CHANNEL_COUNT * SUBSCRIBER_COUNT);
+ }
+
+ @Test
public void shouldContinueToFunction_whenOneServerShutsDownGracefully_givenTwoSubscribersOnePublisher()
throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
@@ -133,13 +209,12 @@ public class PubSubDUnitTest {
MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
- Future<Void> subscriber1Future = executor.submit(
- () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
- Future<Void> subscriber2Future = executor.submit(
- () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
+ Future<Void> subscriber1Future =
+ executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+ Future<Void> subscriber2Future =
+ executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
- assertThat(latch.await(30, TimeUnit.SECONDS))
- .as("channel subscription was not received")
+ assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();
Long result = publisher1.publish(CHANNEL_NAME, "hello");
@@ -164,13 +239,12 @@ public class PubSubDUnitTest {
MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
- Future<Void> subscriber1Future = executor.submit(
- () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
- Future<Void> subscriber2Future = executor.submit(
- () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
+ Future<Void> subscriber1Future =
+ executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+ Future<Void> subscriber2Future =
+ executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
- assertThat(latch.await(30, TimeUnit.SECONDS))
- .as("channel subscription was not received")
+ assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();
Long result = publisher1.publish(CHANNEL_NAME, "hello");
@@ -215,13 +289,12 @@ public class PubSubDUnitTest {
MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
- Future<Void> subscriber1Future = executor.submit(
- () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
- Future<Void> subscriber2Future = executor.submit(
- () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
+ Future<Void> subscriber1Future =
+ executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+ Future<Void> subscriber2Future =
+ executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
- assertThat(latch.await(30, TimeUnit.SECONDS))
- .as("channel subscription was not received")
+ assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();
Long resultPublisher1 = publisher1.publish(CHANNEL_NAME, "hello");
@@ -248,13 +321,12 @@ public class PubSubDUnitTest {
MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
- Future<Void> subscriber1Future = executor.submit(
- () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
- Future<Void> subscriber2Future = executor.submit(
- () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
+ Future<Void> subscriber1Future =
+ executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+ Future<Void> subscriber2Future =
+ executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
- assertThat(latch.await(30, TimeUnit.SECONDS))
- .as("channel subscription was not received")
+ assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();
Long result = publisher1.publish(CHANNEL_NAME, "hello");
@@ -276,13 +348,12 @@ public class PubSubDUnitTest {
MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
- Future<Void> subscriber1Future = executor.submit(
- () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
- Future<Void> subscriber2Future = executor.submit(
- () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
+ Future<Void> subscriber1Future =
+ executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+ Future<Void> subscriber2Future =
+ executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
- assertThat(latch.await(30, TimeUnit.SECONDS))
- .as("channel subscription was not received")
+ assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
.isTrue();
List<Future<Void>> futures = new LinkedList<>();
@@ -321,6 +392,7 @@ public class PubSubDUnitTest {
String CHANNEL_NAME = "best_channel_ever";
List<Jedis> clients = new ArrayList<>();
+ List<MockSubscriber> subscribers = new ArrayList<>();
// Build up an initial set of subscribers
for (int i = 0; i < CLIENT_COUNT; i++) {
@@ -332,6 +404,7 @@ public class PubSubDUnitTest {
executor.submit(() -> client.subscribe(mockSubscriber, CHANNEL_NAME));
latch.await();
+ subscribers.add(mockSubscriber);
clients.add(client);
}
@@ -344,13 +417,18 @@ public class PubSubDUnitTest {
assertThat(result).isEqualTo(CLIENT_COUNT * 10);
+ subscribers.forEach(x -> {
+ x.unsubscribe();
+ x.awaitUnsubscribe(CHANNEL_NAME);
+ });
clients.forEach(Jedis::close);
}
@Test
public void testPubSubWithManyClientsDisconnecting() throws Exception {
- int CLIENT_COUNT = 10;
- int ITERATIONS = 400;
+ int CLIENT_COUNT = 1;
+ int ITERATIONS = 1000;
+ String LOCAL_CHANNEL_NAME = "disconnecting_channel";
Random random = new Random();
List<Jedis> clients = new ArrayList<>();
@@ -362,15 +440,15 @@ public class PubSubDUnitTest {
CountDownLatch latch = new CountDownLatch(1);
MockSubscriber mockSubscriber = new MockSubscriber(latch);
- executor.submit(() -> client.subscribe(mockSubscriber, CHANNEL_NAME));
+ executor.submit(() -> client.subscribe(mockSubscriber, LOCAL_CHANNEL_NAME));
latch.await();
}
// Start actively publishing in the background
- Jedis publishingClient = new Jedis("localhost", redisServerPort1);
+ Jedis publishingClient = new Jedis("localhost", redisServerPort1, 60_000);
Callable<Void> callable = () -> {
for (int j = 0; j < ITERATIONS; j++) {
- publishingClient.publish(CHANNEL_NAME, "hello - " + j);
+ publishingClient.publish(LOCAL_CHANNEL_NAME, "hello - " + j);
}
return null;
};
@@ -385,7 +463,7 @@ public class PubSubDUnitTest {
Jedis client = new Jedis("localhost", redisServerPort1);
CountDownLatch latch = new CountDownLatch(1);
MockSubscriber mockSubscriber = new MockSubscriber(latch);
- executor.submit(() -> client.subscribe(mockSubscriber, CHANNEL_NAME));
+ executor.submit(() -> client.subscribe(mockSubscriber, LOCAL_CHANNEL_NAME));
latch.await();
clients.set(candy, client);
@@ -409,12 +487,11 @@ public class PubSubDUnitTest {
}
private void waitForRestart() {
- await()
- .untilAsserted(() -> gfsh.executeAndAssertThat("list members")
- .statusIsSuccess()
- .hasTableSection()
+ await().untilAsserted(
+ () -> gfsh.executeAndAssertThat("list members").statusIsSuccess().hasTableSection()
.hasColumn("Name")
- .containsOnly("locator-0", "server-1", "server-2", "server-3", "server-4", "server-5"));
+ .containsOnly("locator-0", "server-1", "server-2", "server-3", "server-4",
+ "server-5"));
}
private void reconnectSubscriber1() {
@@ -424,4 +501,25 @@ public class PubSubDUnitTest {
private void reconnectSubscriber2() {
subscriber2 = new Jedis(LOCAL_HOST, redisServerPort2);
}
+
+ private Jedis getConnection(Random random) {
+ Jedis client = null;
+
+ for (int i = 0; i < 10; i++) {
+ int randPort = random.nextInt(4) + 1;
+ try {
+ client = new Jedis("localhost", cluster.getRedisPort(randPort), JEDIS_TIMEOUT);
+ client.ping();
+ return client;
+ } catch (Exception e) {
+ try {
+ if (client != null) {
+ client.close();
+ }
+ } catch (Exception exception) {
+ }
+ }
+ }
+ throw new RuntimeException("Tried 10 times, but could not get a good connection.");
+ }
}
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java
index 639ea45..5e011e9 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java
@@ -17,16 +17,25 @@ package org.apache.geode.redis.session;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.test.junit.categories.RedisTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
@Category({RedisTest.class})
public class RedisSessionDUnitTest extends SessionDUnitTest {
+ @Rule
+ public ExecutorServiceRule executor = new ExecutorServiceRule();
+
@BeforeClass
public static void setup() {
SessionDUnitTest.setup();
@@ -45,6 +54,42 @@ public class RedisSessionDUnitTest extends SessionDUnitTest {
}
@Test
+ public void createSessionsConcurrently() throws ExecutionException, InterruptedException {
+ AtomicBoolean running = new AtomicBoolean(true);
+ CountDownLatch latch = new CountDownLatch(1);
+
+ Future<Void> future1 = executor.submit(() -> sessionCreator(1, 100, running, latch));
+ Future<Void> future2 = executor.submit(() -> sessionCreator(2, 100, running, latch));
+ Future<Void> future3 = executor.submit(() -> sessionCreator(3, 100, running, latch));
+
+ latch.countDown();
+
+ future1.get();
+ future2.get();
+ future3.get();
+ }
+
+ private void sessionCreator(int index, int iterations, AtomicBoolean running,
+ CountDownLatch latch)
+ throws InterruptedException {
+ int iterationCount = 0;
+ latch.await();
+ while (iterationCount < iterations && running.get()) {
+ String noteName = String.format("note-%d-%d", index, iterationCount);
+ try {
+ String sessionCookie = createNewSessionWithNote(APP1, noteName);
+ String[] sessionNotes = getSessionNotes(APP2, sessionCookie);
+ assertThat(sessionNotes).containsExactly(noteName);
+ } catch (Exception e) {
+ running.set(false);
+ throw new RuntimeException("BANG " + noteName, e);
+ }
+
+ iterationCount++;
+ }
+ }
+
+ @Test
public void should_storeSession() {
String sessionCookie = createNewSessionWithNote(APP1, "note1");
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java
new file mode 100644
index 0000000..23c0073
--- /dev/null
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.pubsub;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisFuture;
+import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.apache.geode.redis.GeodeRedisServerRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class LettucePubSubIntegrationTest {
+
+ private static final String CHANNEL = "best-channel";
+ private RedisClient client;
+
+ @ClassRule
+ public static GeodeRedisServerRule server = new GeodeRedisServerRule();
+
+ @ClassRule
+ public static ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ @Before
+ public void before() {
+ client = RedisClient.create("redis://localhost:" + server.getPort());
+ }
+
+ @After
+ public void after() {
+ client.shutdown();
+ }
+
+
+ @Test
+ public void concurrentPublishersToMultipleSubscribers_doNotLosePublishMessages()
+ throws Exception {
+ int subscriberCount = 50;
+ int publisherCount = 10;
+ int publishIterations = 10000;
+
+ for (int i = 0; i < subscriberCount; i++) {
+ StatefulRedisPubSubConnection<String, String> subscriber = client.connectPubSub();
+ subscriber.sync().subscribe(CHANNEL);
+ }
+
+ List<Future<Long>> results = new ArrayList<>();
+ for (int i = 0; i < publisherCount; i++) {
+ int localI = i;
+ results.add(executor.submit(() -> publish(localI, publishIterations)));
+ }
+
+ long publishCount = 0;
+ for (Future<Long> r : results) {
+ publishCount += r.get();
+ }
+
+ assertThat(publishCount).isEqualTo(subscriberCount * publisherCount * publishIterations);
+ }
+
+ private Long publish(int index, int iterationCount) throws Exception {
+ StatefulRedisPubSubConnection<String, String> publisher = client.connectPubSub();
+ long publishCount = 0;
+
+ List<RedisFuture<Long>> results = new ArrayList<>();
+ for (int i = 0; i < iterationCount; i++) {
+ results.add(publisher.async().publish(CHANNEL, "message-" + index + "-" + i));
+ }
+
+ for (RedisFuture<Long> r : results) {
+ publishCount += r.get();
+ }
+
+ return publishCount;
+ }
+}
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java
index 20c4a60..5d341cf 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java
@@ -20,10 +20,21 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.config.Configurator;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -32,6 +43,8 @@ import org.junit.experimental.categories.Category;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Protocol;
+import org.apache.geode.logging.internal.log4j.api.FastLogger;
+import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.redis.GeodeRedisServerRule;
import org.apache.geode.redis.mocks.MockBinarySubscriber;
import org.apache.geode.redis.mocks.MockSubscriber;
@@ -43,7 +56,7 @@ import org.apache.geode.test.junit.rules.ExecutorServiceRule;
public class PubSubIntegrationTest {
static Jedis publisher;
static Jedis subscriber;
- static final int REDIS_CLIENT_TIMEOUT = 100000;
+ public static final int JEDIS_TIMEOUT = Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
@ClassRule
public static GeodeRedisServerRule server = new GeodeRedisServerRule();
@@ -53,8 +66,8 @@ public class PubSubIntegrationTest {
@BeforeClass
public static void setUp() {
- subscriber = new Jedis("localhost", server.getPort(), REDIS_CLIENT_TIMEOUT);
- publisher = new Jedis("localhost", server.getPort(), REDIS_CLIENT_TIMEOUT);
+ subscriber = new Jedis("localhost", server.getPort(), JEDIS_TIMEOUT);
+ publisher = new Jedis("localhost", server.getPort(), JEDIS_TIMEOUT);
}
@AfterClass
@@ -379,11 +392,13 @@ public class PubSubIntegrationTest {
waitFor(() -> !subscriberThread.isAlive());
List<String> unsubscribedChannels = mockSubscriber.unsubscribeInfos.stream()
- .map(x -> x.channel).collect(Collectors.toList());
+ .map(x -> x.channel)
+ .collect(Collectors.toList());
assertThat(unsubscribedChannels).containsExactlyInAnyOrder("salutations", "yuletide");
List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream()
- .map(x -> x.count).collect(Collectors.toList());
+ .map(x -> x.count)
+ .collect(Collectors.toList());
assertThat(channelCounts).containsExactlyInAnyOrder(1, 0);
}
@@ -404,11 +419,13 @@ public class PubSubIntegrationTest {
waitFor(() -> !subscriberThread.isAlive());
List<String> unsubscribedChannels = mockSubscriber.unsubscribeInfos.stream()
- .map(x -> x.channel).collect(Collectors.toList());
+ .map(x -> x.channel)
+ .collect(Collectors.toList());
assertThat(unsubscribedChannels).containsExactlyInAnyOrder("salutations", "yuletide");
List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream()
- .map(x -> x.count).collect(Collectors.toList());
+ .map(x -> x.count)
+ .collect(Collectors.toList());
assertThat(channelCounts).containsExactlyInAnyOrder(1, 0);
Long result = publisher.publish("salutations", "greetings");
@@ -455,7 +472,7 @@ public class PubSubIntegrationTest {
@Test
public void testTwoSubscribersOneChannel() {
- Jedis subscriber2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
+ Jedis subscriber2 = new Jedis("localhost", getPort(), JEDIS_TIMEOUT);
MockSubscriber mockSubscriber1 = new MockSubscriber();
MockSubscriber mockSubscriber2 = new MockSubscriber();
@@ -525,7 +542,7 @@ public class PubSubIntegrationTest {
@Test
public void testDeadSubscriber() {
- Jedis deadSubscriber = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
+ Jedis deadSubscriber = new Jedis("localhost", getPort(), JEDIS_TIMEOUT);
MockSubscriber mockSubscriber = new MockSubscriber();
@@ -674,6 +691,153 @@ public class PubSubIntegrationTest {
assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello");
}
+ private Jedis getConnection() {
+ Exception lastException = null;
+
+ for (int i = 0; i < 10; i++) {
+ Jedis client = null;
+ try {
+ client = new Jedis("localhost", server.getPort(), JEDIS_TIMEOUT);
+ client.ping();
+ return client;
+ } catch (Exception e) {
+ lastException = e;
+ if (client != null) {
+ try {
+ client.close();
+ } catch (Exception ignore) {
+ }
+ }
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ throw new RuntimeException("Tried 10 times, but could not get a good connection.",
+ lastException);
+ }
+
+ int doPublishing(int index, int minimumIterations, AtomicBoolean running) {
+ int iterationCount = 0;
+ int publishedMessages = 0;
+ Jedis client = getConnection();
+ try {
+ while (iterationCount < minimumIterations || running.get()) {
+ publishedMessages += client.publish("my-channel", "boo-" + index + "-" + iterationCount);
+ iterationCount++;
+ }
+ } finally {
+ client.close();
+ }
+
+ return publishedMessages;
+ }
+
+ int makeSubscribers(int index, int minimumIterations, AtomicBoolean running)
+ throws InterruptedException, ExecutionException {
+ ExecutorService executor = Executors.newFixedThreadPool(100);
+ // ExecutorService secondaryExecutor = Executors.newCachedThreadPool();
+ Queue<Future<Void>> workQ = new ConcurrentLinkedQueue<>();
+
+ Future<Integer> consumer = executor.submit(() -> {
+ int subscribersProcessed = 0;
+ while (subscribersProcessed < minimumIterations || running.get()) {
+ if (workQ.isEmpty()) {
+ Thread.yield();
+ continue;
+ }
+ Future<Void> f = workQ.poll();
+ try {
+ f.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ subscribersProcessed++;
+ }
+ return subscribersProcessed;
+ });
+
+ int iterationCount = 0;
+ String channel = "my-channel";
+ while (iterationCount < minimumIterations || running.get()) {
+ Future<Void> f = executor.submit(() -> {
+ Jedis client = getConnection();
+ ExecutorService secondaryExecutor = Executors.newSingleThreadExecutor();
+ MockSubscriber mockSubscriber = new MockSubscriber();
+ AtomicReference<Thread> innerThread = new AtomicReference<>();
+ Future<Void> inner = secondaryExecutor.submit(() -> {
+ innerThread.set(Thread.currentThread());
+ client.subscribe(mockSubscriber, channel);
+ return null;
+ });
+ mockSubscriber.awaitSubscribe(channel);
+ if (inner.isDone()) {
+ throw new RuntimeException("inner completed before unsubscribe");
+ }
+
+ mockSubscriber.unsubscribe(channel);
+
+ try {
+ inner.get(30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LogService.getLogger().debug("=> {} {}", innerThread.get(), innerThread.get().getState());
+ for (StackTraceElement st : innerThread.get().getStackTrace()) {
+ LogService.getLogger().debug("-> {}", st);
+ }
+ throw new RuntimeException("inner.get() errored after unsubscribe: " + e.getMessage());
+ }
+
+ mockSubscriber.awaitUnsubscribe(channel);
+ client.close();
+ secondaryExecutor.shutdownNow();
+
+ return null;
+ });
+ workQ.add(f);
+ iterationCount++;
+ }
+
+ int result = consumer.get();
+ executor.shutdownNow();
+
+ return result;
+ }
+
+ @Test
+ public void concurrentSubscribers_andPublishers_doesNotHang()
+ throws InterruptedException, ExecutionException {
+ Logger logger = LogService.getLogger("org.apache.geode.redis");
+ Configurator.setAllLevels(logger.getName(), Level.getLevel("DEBUG"));
+ FastLogger.setDelegating(true);
+
+ AtomicBoolean running = new AtomicBoolean(true);
+
+ Future<Integer> makeSubscribersFuture1 =
+ executor.submit(() -> makeSubscribers(1, 10000, running));
+ Future<Integer> makeSubscribersFuture2 =
+ executor.submit(() -> makeSubscribers(2, 10000, running));
+
+ Future<Integer> publish1 = executor.submit(() -> doPublishing(1, 10000, running));
+ Future<Integer> publish2 = executor.submit(() -> doPublishing(2, 10000, running));
+ Future<Integer> publish3 = executor.submit(() -> doPublishing(3, 10000, running));
+ Future<Integer> publish4 = executor.submit(() -> doPublishing(4, 10000, running));
+ Future<Integer> publish5 = executor.submit(() -> doPublishing(5, 10000, running));
+
+ running.set(false);
+
+ assertThat(makeSubscribersFuture1.get()).isGreaterThanOrEqualTo(10);
+ assertThat(makeSubscribersFuture2.get()).isGreaterThanOrEqualTo(10);
+
+ assertThat(publish1.get()).isGreaterThan(0);
+ assertThat(publish2.get()).isGreaterThan(0);
+ assertThat(publish3.get()).isGreaterThan(0);
+ assertThat(publish4.get()).isGreaterThan(0);
+ assertThat(publish5.get()).isGreaterThan(0);
+ }
+
private void waitFor(Callable<Boolean> booleanCallable) {
GeodeAwaitility.await()
.ignoreExceptions() // ignoring socket closed exceptions
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/pubsub/SubscriptionsIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/pubsub/SubscriptionsIntegrationTest.java
index a4dc896..cf79b1c 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/pubsub/SubscriptionsIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/pubsub/SubscriptionsIntegrationTest.java
@@ -108,7 +108,8 @@ public class SubscriptionsIntegrationTest {
for (int i = 0; i < ITERATIONS; i++) {
Client client = new Client(mock(Channel.class));
clients.add(client);
- subscriptions.add(new ChannelSubscription(client, "channel".getBytes(), context));
+ subscriptions
+ .add(new ChannelSubscription(client, "channel".getBytes(), context, subscriptions));
}
new ConcurrentLoopingThreads(1,
@@ -128,7 +129,8 @@ public class SubscriptionsIntegrationTest {
for (int i = 0; i < ITERATIONS; i++) {
Client client = new Client(mock(Channel.class));
clients.add(client);
- subscriptions.add(new ChannelSubscription(client, "channel".getBytes(), context));
+ subscriptions
+ .add(new ChannelSubscription(client, "channel".getBytes(), context, subscriptions));
}
new ConcurrentLoopingThreads(1,
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java
index f814a96..cbbec95 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java
@@ -52,4 +52,10 @@ public class DummySubscription implements Subscription {
public byte[] getChannelName() {
return null;
}
+
+ @Override
+ public void readyToPublish() {}
+
+ @Override
+ public void shutdown() {}
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
index 8f4c727..0d8bcb7 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
@@ -93,7 +93,9 @@ public class GeodeRedisServer {
new PassiveExpirationManager(regionProvider.getDataRegion(), redisStats);
nettyRedisServer = new NettyRedisServer(() -> cache.getInternalDistributedSystem().getConfig(),
regionProvider, pubSub,
- this::allowUnsupportedCommands, this::shutdown, port, bindAddress, redisStats);
+ this::allowUnsupportedCommands, this::shutdown, port, bindAddress, redisStats,
+ cache.getInternalDistributedSystem().getDistributionManager().getExecutors()
+ .getWaitingThreadPool());
}
public void setAllowUnsupportedCommands(boolean allowUnsupportedCommands) {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
index 8a6d153..4a82c4e 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
@@ -32,10 +32,22 @@ public class RedisResponse {
private final Function<ByteBufAllocator, ByteBuf> coderCallback;
+ private Runnable afterWriteCallback;
+
private RedisResponse(Function<ByteBufAllocator, ByteBuf> coderCallback) {
this.coderCallback = coderCallback;
}
+ public void setAfterWriteCallback(Runnable callback) {
+ afterWriteCallback = callback;
+ }
+
+ public void afterWrite() {
+ if (afterWriteCallback != null) {
+ afterWriteCallback.run();
+ }
+ }
+
public ByteBuf encode(ByteBufAllocator allocator) {
return coderCallback.apply(allocator);
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
index 9ece01f..9244b55 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
@@ -18,34 +18,58 @@ package org.apache.geode.redis.internal.executor.pubsub;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.function.Consumer;
import org.apache.geode.redis.internal.executor.AbstractExecutor;
-import org.apache.geode.redis.internal.executor.GlobPattern;
import org.apache.geode.redis.internal.executor.RedisResponse;
import org.apache.geode.redis.internal.netty.Command;
import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.pubsub.SubscribeResult;
public class PsubscribeExecutor extends AbstractExecutor {
@Override
public RedisResponse executeCommand(Command command,
ExecutionHandlerContext context) {
- Collection<Collection<?>> items = new ArrayList<>();
+
+ Collection<SubscribeResult> results = new ArrayList<>();
for (int i = 1; i < command.getProcessedCommand().size(); i++) {
- Collection<Object> item = new ArrayList<>();
- byte[] pattern = command.getProcessedCommand().get(i);
- long subscribedChannels =
- context.getPubSub().psubscribe(
- new GlobPattern(new String(pattern)), context, context.getClient());
+ byte[] patternBytes = command.getProcessedCommand().get(i);
+ SubscribeResult result =
+ context.getPubSub().psubscribe(patternBytes, context, context.getClient());
+ results.add(result);
+ }
+ Collection<Collection<?>> items = new ArrayList<>();
+ for (SubscribeResult result : results) {
+ Collection<Object> item = new ArrayList<>();
item.add("psubscribe");
- item.add(pattern);
- item.add(subscribedChannels);
-
+ item.add(result.getChannel());
+ item.add(result.getChannelCount());
items.add(item);
}
- return RedisResponse.flattenedArray(items);
+
+ Runnable callback = () -> {
+ Consumer<Boolean> innerCallback = success -> {
+ for (SubscribeResult result : results) {
+ if (result.getSubscription() != null) {
+ if (success) {
+ result.getSubscription().readyToPublish();
+ } else {
+ result.getSubscription().shutdown();
+ }
+ }
+ }
+ };
+ context.changeChannelEventLoopGroup(context.getSubscriberGroup(), innerCallback);
+ };
+
+ RedisResponse response = RedisResponse.flattenedArray(items);
+ response.setAfterWriteCallback(callback);
+
+ return response;
+
}
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
index 12101d5..ee51563 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
@@ -27,12 +27,13 @@ public class PublishExecutor extends AbstractExecutor {
@Override
public RedisResponse executeCommand(Command command,
ExecutionHandlerContext context) {
- List<byte[]> args = command.getProcessedCommand();
- long publishCount =
- context.getPubSub().publish(getDataRegion(context), args.get(1), args.get(2));
+ List<byte[]> args = command.getProcessedCommand();
+ byte[] channelName = args.get(1);
+ byte[] message = args.get(2);
+ long publishCount = context.getPubSub()
+ .publish(context.getRegionProvider().getDataRegion(), channelName, message);
return RedisResponse.integer(publishCount);
}
-
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java
index 0cdacfb..9f2150f 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java
@@ -17,34 +17,55 @@ package org.apache.geode.redis.internal.executor.pubsub;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.function.Consumer;
import org.apache.geode.redis.internal.executor.AbstractExecutor;
import org.apache.geode.redis.internal.executor.RedisResponse;
import org.apache.geode.redis.internal.netty.Command;
import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.pubsub.SubscribeResult;
public class SubscribeExecutor extends AbstractExecutor {
@Override
public RedisResponse executeCommand(Command command,
ExecutionHandlerContext context) {
- Collection<Collection<?>> items = new ArrayList<>();
+ Collection<SubscribeResult> results = new ArrayList<>();
for (int i = 1; i < command.getProcessedCommand().size(); i++) {
- Collection<Object> item = new ArrayList<>();
byte[] channelName = command.getProcessedCommand().get(i);
- long subscribedChannels =
+ SubscribeResult result =
context.getPubSub().subscribe(channelName, context, context.getClient());
+ results.add(result);
+ }
+ Collection<Collection<?>> items = new ArrayList<>();
+ for (SubscribeResult result : results) {
+ Collection<Object> item = new ArrayList<>();
item.add("subscribe");
- item.add(channelName);
- item.add(subscribedChannels);
-
+ item.add(result.getChannel());
+ item.add(result.getChannelCount());
items.add(item);
}
- context.changeChannelEventLoopGroup(context.getSubscriberGroup());
+ Runnable callback = () -> {
+ Consumer<Boolean> innerCallback = success -> {
+ for (SubscribeResult result : results) {
+ if (result.getSubscription() != null) {
+ if (success) {
+ result.getSubscription().readyToPublish();
+ } else {
+ result.getSubscription().shutdown();
+ }
+ }
+ }
+ };
+ context.changeChannelEventLoopGroup(context.getSubscriberGroup(), innerCallback);
+ };
+
+ RedisResponse response = RedisResponse.flattenedArray(items);
+ response.setAfterWriteCallback(callback);
- return RedisResponse.flattenedArray(items);
+ return response;
}
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Command.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Command.java
index 14ccfb5..74e516c 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Command.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Command.java
@@ -19,6 +19,8 @@ import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.stream.Collectors;
+import io.netty.channel.ChannelHandlerContext;
+
import org.apache.geode.redis.internal.RedisCommandType;
import org.apache.geode.redis.internal.data.ByteArrayWrapper;
import org.apache.geode.redis.internal.executor.RedisResponse;
@@ -35,6 +37,14 @@ public class Command {
private ByteArrayWrapper bytes;
/**
+ * Constructor used to create a static marker Command for shutting down executors.
+ */
+ Command() {
+ commandElems = null;
+ commandType = null;
+ }
+
+ /**
* Constructor for {@link Command}. Must initialize Command with a {@link SocketChannel} and a
* {@link List} of command elements
*
@@ -176,4 +186,24 @@ public class Command {
return String.format("wrong number of arguments for '%s' command",
getCommandType().toString().toLowerCase());
}
+
+ private long asyncStartTime;
+
+ public void setAsyncStartTime(long start) {
+ asyncStartTime = start;
+ }
+
+ public long getAsyncStartTime() {
+ return asyncStartTime;
+ }
+
+ private ChannelHandlerContext channelHandlerContext;
+
+ public void setChannelHandlerContext(ChannelHandlerContext ctx) {
+ channelHandlerContext = ctx;
+ }
+
+ public ChannelHandlerContext getChannelHandlerContext() {
+ return channelHandlerContext;
+ }
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index e3f5546..ab33f84 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -17,6 +17,9 @@ package org.apache.geode.redis.internal.netty;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Consumer;
import java.util.function.Supplier;
import io.netty.buffer.ByteBuf;
@@ -61,17 +64,22 @@ import org.apache.geode.redis.internal.pubsub.PubSub;
public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
private static final Logger logger = LogService.getLogger();
+ private static final Command TERMINATE_COMMAND = new Command();
private final Client client;
private final Channel channel;
private final RegionProvider regionProvider;
private final PubSub pubsub;
- private final EventLoopGroup subscriberGroup;
private final ByteBufAllocator byteBufAllocator;
private final byte[] authPassword;
private final Supplier<Boolean> allowUnsupportedSupplier;
private final Runnable shutdownInvoker;
private final RedisStats redisStats;
+ private final EventLoopGroup subscriberGroup;
+ private final int MAX_QUEUED_COMMANDS =
+ Integer.getInteger("geode.redis.commandQueueSize", 1000);
+ private final LinkedBlockingQueue<Command> commandQueue =
+ new LinkedBlockingQueue<>(MAX_QUEUED_COMMANDS);
private boolean isAuthenticated;
@@ -82,31 +90,57 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
* @param password Authentication password for each context, can be null
*/
public ExecutionHandlerContext(Channel channel, RegionProvider regionProvider, PubSub pubsub,
- EventLoopGroup subscriberGroup,
Supplier<Boolean> allowUnsupportedSupplier,
Runnable shutdownInvoker,
RedisStats redisStats,
+ ExecutorService backgroundExecutor,
+ EventLoopGroup subscriberGroup,
byte[] password) {
this.channel = channel;
this.regionProvider = regionProvider;
this.pubsub = pubsub;
- this.subscriberGroup = subscriberGroup;
this.allowUnsupportedSupplier = allowUnsupportedSupplier;
this.shutdownInvoker = shutdownInvoker;
this.redisStats = redisStats;
+ this.subscriberGroup = subscriberGroup;
this.client = new Client(channel);
this.byteBufAllocator = this.channel.alloc();
this.authPassword = password;
this.isAuthenticated = password == null;
redisStats.addClient();
- }
- public ChannelFuture writeToChannel(ByteBuf message) {
- return channel.writeAndFlush(message, channel.newPromise());
+ backgroundExecutor.submit(this::processCommandQueue);
}
public ChannelFuture writeToChannel(RedisResponse response) {
- return channel.writeAndFlush(response.encode(byteBufAllocator), channel.newPromise());
+ return channel.writeAndFlush(response.encode(byteBufAllocator), channel.newPromise())
+ .addListener((ChannelFutureListener) f -> {
+ response.afterWrite();
+ logResponse(response, channel.remoteAddress().toString(), f.cause());
+ });
+ }
+
+ private void processCommandQueue() {
+ while (true) {
+ Command command = takeCommandFromQueue();
+ if (command == TERMINATE_COMMAND) {
+ return;
+ }
+ try {
+ executeCommand(command);
+ } catch (Throwable ex) {
+ exceptionCaught(command.getChannelHandlerContext(), ex);
+ }
+ }
+ }
+
+ private Command takeCommandFromQueue() {
+ try {
+ return commandQueue.take();
+ } catch (InterruptedException e) {
+ logger.info("Command queue thread interrupted");
+ return TERMINATE_COMMAND;
+ }
}
/**
@@ -115,17 +149,8 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Command command = (Command) msg;
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Executing Redis command: {}", command);
- }
-
- executeCommand(ctx, command);
- } catch (Exception e) {
- logger.warn("Execution of Redis command {} failed: {}", command, e);
- throw e;
- }
-
+ command.setChannelHandlerContext(ctx);
+ commandQueue.put(command);
}
/**
@@ -139,6 +164,33 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
}
}
+ public EventLoopGroup getSubscriberGroup() {
+ return subscriberGroup;
+ }
+
+ public synchronized void changeChannelEventLoopGroup(EventLoopGroup newGroup,
+ Consumer<Boolean> callback) {
+ if (newGroup.equals(channel.eventLoop())) {
+ // already registered with newGroup
+ callback.accept(true);
+ return;
+ }
+ channel.deregister().addListener((ChannelFutureListener) future -> {
+ boolean registerSuccess = true;
+ synchronized (channel) {
+ if (!channel.isRegistered()) {
+ try {
+ newGroup.register(channel).sync();
+ } catch (Exception e) {
+ logger.warn("Unable to register new EventLoopGroup: {}", e.getMessage());
+ registerSuccess = false;
+ }
+ }
+ }
+ callback.accept(registerSuccess);
+ });
+ }
+
private RedisResponse getExceptionResponse(ChannelHandlerContext ctx, Throwable cause) {
RedisResponse response;
if (cause instanceof IOException) {
@@ -192,43 +244,50 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
if (logger.isDebugEnabled()) {
logger.debug("GeodeRedisServer-Connection closing with " + ctx.channel().remoteAddress());
}
+ commandQueue.offer(TERMINATE_COMMAND);
redisStats.removeClient();
ctx.channel().close();
ctx.close();
}
- private void executeCommand(ChannelHandlerContext ctx, Command command) {
- RedisResponse response;
+ private void executeCommand(Command command) {
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Executing Redis command: {} - {}", command,
+ channel.remoteAddress().toString());
+ }
- if (!isAuthenticated()) {
- response = handleUnAuthenticatedCommand(command);
- writeToChannel(response);
- return;
- }
+ if (!isAuthenticated()) {
+ writeToChannel(handleUnAuthenticatedCommand(command));
+ return;
+ }
- if (command.isUnsupported() && !allowUnsupportedCommands()) {
- writeToChannel(
- RedisResponse.error(command.getCommandType() + RedisConstants.ERROR_UNSUPPORTED_COMMAND));
- return;
- }
+ if (command.isUnsupported() && !allowUnsupportedCommands()) {
+ writeToChannel(
+ RedisResponse
+ .error(command.getCommandType() + RedisConstants.ERROR_UNSUPPORTED_COMMAND));
+ return;
+ }
- if (command.isUnimplemented()) {
- logger.info("Failed " + command.getCommandType() + " because it is not implemented.");
- writeToChannel(RedisResponse.error(command.getCommandType() + " is not implemented."));
- return;
- }
+ if (command.isUnimplemented()) {
+ logger.info("Failed " + command.getCommandType() + " because it is not implemented.");
+ writeToChannel(RedisResponse.error(command.getCommandType() + " is not implemented."));
+ return;
+ }
- final long start = redisStats.startCommand(command.getCommandType());
- try {
- response = command.execute(this);
- logResponse(response);
- writeToChannel(response);
- } finally {
- redisStats.endCommand(command.getCommandType(), start);
- }
+ final long start = redisStats.startCommand(command.getCommandType());
+ try {
+ writeToChannel(command.execute(this));
+ } finally {
+ redisStats.endCommand(command.getCommandType(), start);
+ }
- if (command.isOfType(RedisCommandType.QUIT)) {
- channelInactive(ctx);
+ if (command.isOfType(RedisCommandType.QUIT)) {
+ channelInactive(command.getChannelHandlerContext());
+ }
+ } catch (Exception e) {
+ logger.warn("Execution of Redis command {} failed: {}", command, e);
+ throw e;
}
}
@@ -247,25 +306,16 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
return response;
}
- public EventLoopGroup getSubscriberGroup() {
- return subscriberGroup;
- }
-
- public void changeChannelEventLoopGroup(EventLoopGroup newGroup) {
- if (newGroup.equals(channel.eventLoop())) {
- // already registered with newGroup
- return;
- }
- channel.deregister().addListener((ChannelFutureListener) future -> {
- newGroup.register(channel).sync();
- });
- }
-
- private void logResponse(RedisResponse response) {
+ private void logResponse(RedisResponse response, String extraMessage, Throwable cause) {
if (logger.isDebugEnabled() && response != null) {
ByteBuf buf = response.encode(new UnpooledByteBufAllocator(false));
- logger.debug("Redis command returned: {}",
- Command.getHexEncodedString(buf.array(), buf.readableBytes()));
+ if (cause == null) {
+ logger.debug("Redis command returned: {} - {}",
+ Command.getHexEncodedString(buf.array(), buf.readableBytes()), extraMessage);
+ } else {
+ logger.debug("Redis command FAILED to return: {} - {}",
+ Command.getHexEncodedString(buf.array(), buf.readableBytes()), extraMessage, cause);
+ }
}
}
@@ -329,5 +379,4 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
return pubsub;
}
-
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
index e5c53dd..bdcc80b 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
@@ -26,6 +26,7 @@ import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.UnrecoverableKeyException;
import java.security.cert.CertificateException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.function.Supplier;
@@ -77,6 +78,7 @@ public class NettyRedisServer {
private final Supplier<Boolean> allowUnsupportedSupplier;
private final Runnable shutdownInvoker;
private final RedisStats redisStats;
+ private final ExecutorService backgroundExecutor;
private final EventLoopGroup selectorGroup;
private final EventLoopGroup workerGroup;
private final EventLoopGroup subscriberGroup;
@@ -88,13 +90,14 @@ public class NettyRedisServer {
RegionProvider regionProvider, PubSub pubsub,
Supplier<Boolean> allowUnsupportedSupplier,
Runnable shutdownInvoker, int port, String requestedAddress,
- RedisStats redisStats) {
+ RedisStats redisStats, ExecutorService backgroundExecutor) {
this.configSupplier = configSupplier;
this.regionProvider = regionProvider;
this.pubsub = pubsub;
this.allowUnsupportedSupplier = allowUnsupportedSupplier;
this.shutdownInvoker = shutdownInvoker;
this.redisStats = redisStats;
+ this.backgroundExecutor = backgroundExecutor;
if (port < RANDOM_PORT_INDICATOR) {
throw new IllegalArgumentException("Redis port cannot be less than 0");
}
@@ -127,6 +130,7 @@ public class NettyRedisServer {
closeFuture = serverChannel.closeFuture();
}
workerGroup.shutdownGracefully();
+ subscriberGroup.shutdownGracefully();
Future<?> bossFuture = selectorGroup.shutdownGracefully();
if (serverChannel != null) {
serverChannel.close();
@@ -158,8 +162,9 @@ public class NettyRedisServer {
pipeline.addLast(ByteToCommandDecoder.class.getSimpleName(), new ByteToCommandDecoder());
pipeline.addLast(new WriteTimeoutHandler(10));
pipeline.addLast(ExecutionHandlerContext.class.getSimpleName(),
- new ExecutionHandlerContext(socketChannel, regionProvider, pubsub, subscriberGroup,
- allowUnsupportedSupplier, shutdownInvoker, redisStats, redisPasswordBytes));
+ new ExecutionHandlerContext(socketChannel, regionProvider, pubsub,
+ allowUnsupportedSupplier, shutdownInvoker, redisStats, backgroundExecutor,
+ subscriberGroup, redisPasswordBytes));
}
};
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/AbstractSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/AbstractSubscription.java
index 298fc58..b446a1f 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/AbstractSubscription.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/AbstractSubscription.java
@@ -16,14 +16,14 @@
package org.apache.geode.redis.internal.pubsub;
-import io.netty.buffer.ByteBuf;
+import java.util.concurrent.CountDownLatch;
+
import io.netty.channel.ChannelFutureListener;
import org.apache.logging.log4j.Logger;
import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.executor.RedisResponse;
import org.apache.geode.redis.internal.netty.Client;
-import org.apache.geode.redis.internal.netty.Coder;
-import org.apache.geode.redis.internal.netty.CoderException;
import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
public abstract class AbstractSubscription implements Subscription {
@@ -31,25 +31,61 @@ public abstract class AbstractSubscription implements Subscription {
private final Client client;
private final ExecutionHandlerContext context;
- AbstractSubscription(Client client, ExecutionHandlerContext context) {
+ // Two things have to happen before we are ready to publish:
+ // 1 - we need to make sure the subscriber has switched EventLoopGroups
+ // 2 - the response to the SUBSCRIBE command has been submitted to the client
+ private final CountDownLatch readyForPublish = new CountDownLatch(1);
+ private final Subscriptions subscriptions;
+ private boolean running = true;
+
+ AbstractSubscription(Client client, ExecutionHandlerContext context,
+ Subscriptions subscriptions) {
if (client == null) {
throw new IllegalArgumentException("client cannot be null");
}
if (context == null) {
throw new IllegalArgumentException("context cannot be null");
}
+ if (subscriptions == null) {
+ throw new IllegalArgumentException("subscriptions cannot be null");
+ }
this.client = client;
this.context = context;
+ this.subscriptions = subscriptions;
+ }
+
+ @Override
+ public void readyToPublish() {
+ readyForPublish.countDown();
}
@Override
public void publishMessage(byte[] channel, byte[] message,
PublishResultCollector publishResultCollector) {
- ByteBuf messageByteBuffer = constructResponse(channel, message);
- writeToChannel(messageByteBuffer, publishResultCollector);
+ try {
+ readyForPublish.await();
+ } catch (InterruptedException e) {
+ // we must be shutting down or registration failed
+ Thread.interrupted();
+ running = false;
+ }
+
+ if (running) {
+ writeToChannel(constructResponse(channel, message), publishResultCollector);
+ } else {
+ publishResultCollector.failure(client);
+ }
}
- Client getClient() {
+ @Override
+ public synchronized void shutdown() {
+ running = false;
+ subscriptions.remove(client);
+ // release any threads currently waiting to publish
+ readyToPublish();
+ }
+
+ public Client getClient() {
return client;
}
@@ -58,25 +94,17 @@ public abstract class AbstractSubscription implements Subscription {
return this.client.equals(client);
}
- private ByteBuf constructResponse(byte[] channel, byte[] message) {
- ByteBuf messageByteBuffer;
- try {
- messageByteBuffer = Coder.getArrayResponse(context.getByteBufAllocator(),
- createResponse(channel, message));
- } catch (CoderException e) {
- logger.warn("Unable to encode publish message", e);
- return null;
- }
- return messageByteBuffer;
+ private RedisResponse constructResponse(byte[] channel, byte[] message) {
+ return RedisResponse.array(createResponse(channel, message));
}
/**
- * This method turns the response into a synchronous call. We want to determine if the response,
- * to the client, resulted in an error - for example if the client has disconnected and the write
- * fails. In such cases we need to be able to notify the caller.
+ * We want to determine if the response, to the client, resulted in an error - for example if
+ * the client has disconnected and the write fails. In such cases we need to be able to notify
+ * the caller.
*/
- private void writeToChannel(ByteBuf messageByteBuffer, PublishResultCollector resultCollector) {
- context.writeToChannel(messageByteBuffer)
+ private void writeToChannel(RedisResponse response, PublishResultCollector resultCollector) {
+ context.writeToChannel(response)
.addListener((ChannelFutureListener) future -> {
if (future.cause() == null) {
resultCollector.success();
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java
index 5d1b69d..8b1d8d1 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java
@@ -28,8 +28,9 @@ import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
class ChannelSubscription extends AbstractSubscription {
private byte[] channel;
- public ChannelSubscription(Client client, byte[] channel, ExecutionHandlerContext context) {
- super(client, context);
+ public ChannelSubscription(Client client, byte[] channel, ExecutionHandlerContext context,
+ Subscriptions subscriptions) {
+ super(client, context, subscriptions);
if (channel == null) {
throw new IllegalArgumentException("channel cannot be null");
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java
index 554946a..d197224 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java
@@ -29,8 +29,9 @@ import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
class PatternSubscription extends AbstractSubscription {
final GlobPattern pattern;
- public PatternSubscription(Client client, GlobPattern pattern, ExecutionHandlerContext context) {
- super(client, context);
+ public PatternSubscription(Client client, GlobPattern pattern, ExecutionHandlerContext context,
+ Subscriptions subscriptions) {
+ super(client, context, subscriptions);
if (pattern == null) {
throw new IllegalArgumentException("pattern cannot be null");
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
index 659db58..7029b12 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
@@ -48,9 +48,9 @@ public interface PubSub {
* @param channel to subscribe to
* @param context ExecutionHandlerContext which will handle the client response
* @param client a Client instance making the request
- * @return the number of channels subscribed to
+ * @return the result of the subscribe
*/
- long subscribe(byte[] channel, ExecutionHandlerContext context, Client client);
+ SubscribeResult subscribe(byte[] channel, ExecutionHandlerContext context, Client client);
/**
* Subscribe to a pattern
@@ -58,9 +58,9 @@ public interface PubSub {
* @param pattern glob pattern to subscribe to
* @param context ExecutionHandlerContext which will handle the client response
* @param client a Client instance making the request
- * @return the number of channels subscribed to
+ * @return the result of the subscribe
*/
- long psubscribe(GlobPattern pattern, ExecutionHandlerContext context, Client client);
+ SubscribeResult psubscribe(byte[] pattern, ExecutionHandlerContext context, Client client);
/**
* Unsubscribe a client from a channel
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
index edbf10d..6b9643f 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
@@ -86,12 +86,13 @@ public class PubSubImpl implements PubSub {
}
@Override
- public long subscribe(byte[] channel, ExecutionHandlerContext context, Client client) {
+ public SubscribeResult subscribe(byte[] channel, ExecutionHandlerContext context, Client client) {
return subscriptions.subscribe(channel, context, client);
}
@Override
- public long psubscribe(GlobPattern pattern, ExecutionHandlerContext context, Client client) {
+ public SubscribeResult psubscribe(byte[] pattern, ExecutionHandlerContext context,
+ Client client) {
return subscriptions.psubscribe(pattern, context, client);
}
@@ -142,7 +143,6 @@ public class PubSubImpl implements PubSub {
@VisibleForTesting
long publishMessageToSubscribers(byte[] channel, byte[] message) {
-
List<Subscription> foundSubscriptions = subscriptions
.findSubscriptions(channel);
if (foundSubscriptions.isEmpty()) {
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriberWithLatch.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/SubscribeResult.java
similarity index 52%
rename from geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriberWithLatch.java
rename to geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/SubscribeResult.java
index a3a1476..ffe6a01 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriberWithLatch.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/SubscribeResult.java
@@ -14,34 +14,34 @@
*
*/
-package org.apache.geode.redis.mocks;
+package org.apache.geode.redis.internal.pubsub;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
+public class SubscribeResult {
+ private final Subscription subscription;
+ private final long channelCount;
+ private final byte[] channel;
-import redis.clients.jedis.JedisPubSub;
-
-
-public class MockSubscriberWithLatch extends JedisPubSub {
- private CountDownLatch latch;
- private List<String> receivedMessages = new ArrayList<String>();
-
- public MockSubscriberWithLatch(CountDownLatch latch) {
- this.latch = latch;
+ public SubscribeResult(Subscription subscription, long channelCount, byte[] channel) {
+ this.subscription = subscription;
+ this.channelCount = channelCount;
+ this.channel = channel;
}
- public List<String> getReceivedMessages() {
- return receivedMessages;
+ /**
+ * Returns the Subscription instance this subscribe operations created; possibly null.
+ */
+ public Subscription getSubscription() {
+ return subscription;
}
- @Override
- public void onSubscribe(String channel, int subscribedChannels) {
- latch.countDown();
+ /**
+ * returns the number of channels this subscribe operation subscribed to.
+ */
+ public long getChannelCount() {
+ return channelCount;
}
- @Override
- public void onMessage(String channel, String message) {
- receivedMessages.add(message);
+ public byte[] getChannel() {
+ return channel;
}
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java
index f8f4656..ba9dccd 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java
@@ -57,4 +57,11 @@ public interface Subscription {
* pattern is returned.
*/
byte[] getChannelName();
+
+ /**
+ * Called once this subscriber is ready to have publishMessage called
+ */
+ void readyToPublish();
+
+ void shutdown();
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
index 1392ab2..76b4048 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
@@ -98,20 +98,29 @@ public class Subscriptions {
return subscriptions.size();
}
- public synchronized long subscribe(byte[] channel, ExecutionHandlerContext context,
+ public synchronized SubscribeResult subscribe(byte[] channel, ExecutionHandlerContext context,
Client client) {
+ Subscription createdSubscription = null;
if (!exists(channel, client)) {
- add(new ChannelSubscription(client, channel, context));
+ createdSubscription = new ChannelSubscription(client, channel, context, this);
+ add(createdSubscription);
}
- return findSubscriptions(client).size();
+ long channelCount = findSubscriptions(client).size();
+ return new SubscribeResult(createdSubscription, channelCount, channel);
}
- public synchronized long psubscribe(GlobPattern pattern, ExecutionHandlerContext context,
+ public SubscribeResult psubscribe(byte[] patternBytes, ExecutionHandlerContext context,
Client client) {
- if (!exists(pattern, client)) {
- add(new PatternSubscription(client, pattern, context));
+ GlobPattern pattern = new GlobPattern(new String(patternBytes));
+ Subscription createdSubscription = null;
+ synchronized (this) {
+ if (!exists(pattern, client)) {
+ createdSubscription = new PatternSubscription(client, pattern, context, this);
+ add(createdSubscription);
+ }
+ long channelCount = findSubscriptions(client).size();
+ return new SubscribeResult(createdSubscription, channelCount, patternBytes);
}
- return findSubscriptions(client).size();
}
public synchronized long unsubscribe(Object channelOrPattern, Client client) {
diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/PubSubImplJUnitTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/PubSubImplJUnitTest.java
index 8136288..f38f0b7 100644
--- a/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/PubSubImplJUnitTest.java
+++ b/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/PubSubImplJUnitTest.java
@@ -22,8 +22,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.channel.DefaultChannelPromise;
@@ -43,15 +41,14 @@ public class PubSubImplJUnitTest {
ExecutionHandlerContext mockContext = mock(ExecutionHandlerContext.class);
FailingChannelFuture mockFuture = new FailingChannelFuture();
- when(mockContext.writeToChannel((ByteBuf) any())).thenReturn(mockFuture);
- when(mockContext.getByteBufAllocator()).thenReturn(new PooledByteBufAllocator());
+ when(mockContext.writeToChannel(any())).thenReturn(mockFuture);
Client deadClient = mock(Client.class);
when(deadClient.isDead()).thenReturn(true);
ChannelSubscription subscription =
- spy(new ChannelSubscription(deadClient,
- "sally".getBytes(), mockContext));
+ spy(new ChannelSubscription(deadClient, "sally".getBytes(), mockContext, subscriptions));
+ subscription.readyToPublish();
subscriptions.add(subscription);
diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/SubscriptionsJUnitTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/SubscriptionsJUnitTest.java
index e72235a..9213f99 100644
--- a/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/SubscriptionsJUnitTest.java
+++ b/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/SubscriptionsJUnitTest.java
@@ -36,7 +36,8 @@ public class SubscriptionsJUnitTest {
ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
Client client = new Client(channel);
- subscriptions.add(new ChannelSubscription(client, "subscriptions".getBytes(), context));
+ subscriptions
+ .add(new ChannelSubscription(client, "subscriptions".getBytes(), context, subscriptions));
assertThat(subscriptions.exists("subscriptions".getBytes(), client)).isTrue();
assertThat(subscriptions.exists("unknown".getBytes(), client)).isFalse();
@@ -51,7 +52,7 @@ public class SubscriptionsJUnitTest {
Client client = new Client(channel);
GlobPattern pattern = new GlobPattern("sub*s");
- subscriptions.add(new PatternSubscription(client, pattern, context));
+ subscriptions.add(new PatternSubscription(client, pattern, context, subscriptions));
assertThat(subscriptions.exists(pattern, client)).isTrue();
}
@@ -66,7 +67,8 @@ public class SubscriptionsJUnitTest {
GlobPattern globPattern1 = new GlobPattern("sub*s");
GlobPattern globPattern2 = new GlobPattern("subscriptions");
- subscriptions.add(new ChannelSubscription(client, "subscriptions".getBytes(), context));
+ subscriptions
+ .add(new ChannelSubscription(client, "subscriptions".getBytes(), context, subscriptions));
assertThat(subscriptions.exists(globPattern1, client)).isFalse();
assertThat(subscriptions.exists(globPattern2, client)).isFalse();
@@ -81,8 +83,9 @@ public class SubscriptionsJUnitTest {
Client client = new Client(channel);
GlobPattern globby = new GlobPattern("sub*s");
- subscriptions.add(new ChannelSubscription(client, "subscriptions".getBytes(), context));
- subscriptions.add(new PatternSubscription(client, globby, context));
+ subscriptions
+ .add(new ChannelSubscription(client, "subscriptions".getBytes(), context, subscriptions));
+ subscriptions.add(new PatternSubscription(client, globby, context, subscriptions));
assertThat(subscriptions.exists(globby, client)).isTrue();
assertThat(subscriptions.exists("subscriptions".getBytes(), client)).isTrue();
@@ -110,9 +113,9 @@ public class SubscriptionsJUnitTest {
Client clientTwo = new Client(mockChannelTwo);
ChannelSubscription subscriptionOne =
- new ChannelSubscription(clientOne, "subscriptions".getBytes(), context);
+ new ChannelSubscription(clientOne, "subscriptions".getBytes(), context, subscriptions);
ChannelSubscription subscriptionTwo =
- new ChannelSubscription(clientTwo, "monkeys".getBytes(), context);
+ new ChannelSubscription(clientTwo, "monkeys".getBytes(), context, subscriptions);
subscriptions.add(subscriptionOne);
subscriptions.add(subscriptionTwo);
@@ -130,9 +133,9 @@ public class SubscriptionsJUnitTest {
Client clientTwo = new Client(mockChannelTwo);
ChannelSubscription subscriptionOne =
- new ChannelSubscription(clientOne, "subscriptions".getBytes(), context);
+ new ChannelSubscription(clientOne, "subscriptions".getBytes(), context, subscriptions);
ChannelSubscription subscriptionTwo =
- new ChannelSubscription(clientTwo, "monkeys".getBytes(), context);
+ new ChannelSubscription(clientTwo, "monkeys".getBytes(), context, subscriptions);
subscriptions.add(subscriptionOne);
subscriptions.add(subscriptionTwo);
@@ -153,12 +156,12 @@ public class SubscriptionsJUnitTest {
Client client = new Client(mockChannelOne);
ChannelSubscription channelSubscriberOne =
- new ChannelSubscription(client, "subscriptions".getBytes(), context);
+ new ChannelSubscription(client, "subscriptions".getBytes(), context, subscriptions);
GlobPattern pattern = new GlobPattern("monkeys");
PatternSubscription patternSubscriber = new PatternSubscription(client,
- pattern, context);
+ pattern, context, subscriptions);
ChannelSubscription channelSubscriberTwo =
- new ChannelSubscription(client, "monkeys".getBytes(), context);
+ new ChannelSubscription(client, "monkeys".getBytes(), context, subscriptions);
subscriptions.add(channelSubscriberOne);
subscriptions.add(patternSubscriber);