You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2020/06/25 12:48:41 UTC
[geode] branch develop updated: Feature/expand pubsub support
(#5284)
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 5e2baea Feature/expand pubsub support (#5284)
5e2baea is described below
commit 5e2baea7516b5e900f2699afddb63c59a20a1ab3
Author: Darrel Schneider <ds...@pivotal.io>
AuthorDate: Thu Jun 25 05:47:48 2020 -0700
Feature/expand pubsub support (#5284)
Co-authored-by: Jens Deppe <jd...@pivotal.io>
Co-authored-by: Sarah <sa...@pivotal.io>
Co-authored-by: Darrel Schneider <ds...@pivotal.io>
---
geode-redis/build.gradle | 1 +
.../executor/pubsub/PubSubIntegrationTest.java | 329 +++++++++++++++++++++
.../geode/redis/mocks/DummySubscription.java | 5 +
.../apache/geode/redis/mocks/MockSubscriber.java | 53 ++++
.../geode/redis/internal/GeodeRedisServer.java | 4 +-
.../geode/redis/internal/RedisCommandType.java | 4 +-
.../executor/pubsub/PunsubscribeExecutor.java | 56 +++-
.../executor/pubsub/UnsubscribeExecutor.java | 55 +++-
.../apache/geode/redis/internal/netty/Coder.java | 2 +-
.../internal/netty/ExecutionHandlerContext.java | 7 +
.../redis/internal/pubsub/ChannelSubscription.java | 4 +
.../redis/internal/pubsub/PatternSubscription.java | 4 +
.../apache/geode/redis/internal/pubsub/PubSub.java | 11 +
.../geode/redis/internal/pubsub/PubSubImpl.java | 29 +-
.../geode/redis/internal/pubsub/Subscription.java | 6 +
.../geode/redis/internal/pubsub/Subscriptions.java | 39 ++-
geode-redis/src/test/resources/expected-pom.xml | 11 +
17 files changed, 569 insertions(+), 51 deletions(-)
diff --git a/geode-redis/build.gradle b/geode-redis/build.gradle
index 6932b29..bd58ce7 100644
--- a/geode-redis/build.gradle
+++ b/geode-redis/build.gradle
@@ -38,6 +38,7 @@ dependencies {
implementation('io.netty:netty-all')
implementation('org.apache.logging.log4j:log4j-api')
implementation('commons-codec:commons-codec')
+ implementation('org.apache.commons:commons-lang3')
testImplementation(project(':geode-junit'))
testImplementation('org.mockito:mockito-core')
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java
index 7389d5b..20c4a60 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java
@@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -29,6 +30,7 @@ import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import redis.clients.jedis.Jedis;
+import redis.clients.jedis.Protocol;
import org.apache.geode.redis.GeodeRedisServerRule;
import org.apache.geode.redis.mocks.MockBinarySubscriber;
@@ -66,6 +68,27 @@ public class PubSubIntegrationTest {
}
@Test
+ @SuppressWarnings("unchecked")
+ public void punsubscribe_whenNonexistent() {
+ assertThat((List<Object>) subscriber.sendCommand(Protocol.Command.PUNSUBSCRIBE, "Nonexistent"))
+ .containsExactly("punsubscribe".getBytes(), "Nonexistent".getBytes(), 0L);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void unsubscribe_whenNoSubscriptionsExist_shouldNotHang() {
+ assertThat((List<Object>) subscriber.sendCommand(Protocol.Command.UNSUBSCRIBE))
+ .containsExactly("unsubscribe".getBytes(), null, 0L);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void punsubscribe_whenNoSubscriptionsExist_shouldNotHang() {
+ assertThat((List<Object>) subscriber.sendCommand(Protocol.Command.PUNSUBSCRIBE))
+ .containsExactly("punsubscribe".getBytes(), null, 0L);
+ }
+
+ @Test
public void testOneSubscriberOneChannel() {
List<String> expectedMessages = Arrays.asList("hello");
@@ -90,6 +113,170 @@ public class PubSubIntegrationTest {
}
@Test
+ public void punsubscribe_givenSubscribe_doesNotReduceSubscriptions() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+
+ Runnable runnable = () -> {
+ subscriber.subscribe(mockSubscriber, "salutations");
+ };
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+ try {
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+ mockSubscriber.punsubscribe("salutations");
+ waitFor(() -> mockSubscriber.punsubscribeInfos.size() == 1);
+
+ assertThat(mockSubscriber.punsubscribeInfos.get(0).channel).isEqualTo("salutations");
+ assertThat(mockSubscriber.punsubscribeInfos.get(0).count).isEqualTo(1);
+ assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
+ } finally {
+ // now cleanup the actual subscription
+ mockSubscriber.unsubscribe("salutations");
+ waitFor(() -> !subscriberThread.isAlive());
+ }
+ }
+
+ @Test
+ public void unsubscribe_givenPsubscribe_doesNotReduceSubscriptions() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+
+ Runnable runnable = () -> {
+ subscriber.psubscribe(mockSubscriber, "salutations");
+ };
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+ try {
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+ mockSubscriber.unsubscribe("salutations");
+ waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1);
+
+ assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
+ assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1);
+ assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
+ } finally {
+ // now cleanup the actual subscription
+ mockSubscriber.punsubscribe("salutations");
+ waitFor(() -> !subscriberThread.isAlive());
+ }
+ }
+
+ @Test
+ public void unsubscribe_onNonExistentSubscription_doesNotReduceSubscriptions() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+ Runnable runnable = () -> {
+ subscriber.subscribe(mockSubscriber, "salutations");
+ };
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+ try {
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+ mockSubscriber.unsubscribe("NonExistent");
+ waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1);
+
+ assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("NonExistent");
+ assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1);
+ assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
+ } finally {
+ // now cleanup the actual subscription
+ mockSubscriber.unsubscribe("salutations");
+ waitFor(() -> !subscriberThread.isAlive());
+ }
+ }
+
+ @Test
+ public void unsubscribe_whenGivenAnEmptyString() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+ Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "salutations");
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+ try {
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+ mockSubscriber.unsubscribe("");
+ waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1);
+
+ assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("");
+ assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1);
+ assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
+ } finally {
+ // now cleanup the actual subscription
+ mockSubscriber.unsubscribe();
+ waitFor(() -> !subscriberThread.isAlive());
+ }
+ }
+
+ @Test
+ public void unsubscribeWithEmptyChannel_doesNotUnsubscribeExistingChannels() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+ Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "salutations");
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+ try {
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+ mockSubscriber.unsubscribe("");
+ waitFor(() -> mockSubscriber.unsubscribeInfos.size() == 1);
+
+ Long result = publisher.publish("salutations", "heyho");
+ waitFor(() -> mockSubscriber.getReceivedMessages().size() == 1);
+
+ assertThat(result).isEqualTo(1);
+ assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo("heyho");
+ } finally {
+ // now cleanup the actual subscription
+ mockSubscriber.unsubscribe();
+ waitFor(() -> !subscriberThread.isAlive());
+ }
+ }
+
+ @Test
+ public void canSubscribeToAnEmptyString() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+ Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "");
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+ Long result = publisher.publish("", "blank");
+ assertThat(result).isEqualTo(1);
+
+ mockSubscriber.unsubscribe("");
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ waitFor(() -> !subscriberThread.isAlive());
+
+ assertThat(mockSubscriber.getReceivedMessages()).containsExactly("blank");
+ }
+
+ @Test
+ public void punsubscribe_onNonExistentSubscription_doesNotReduceSubscriptions() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+ Runnable runnable = () -> {
+ subscriber.psubscribe(mockSubscriber, "salutations");
+ };
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+ try {
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+ mockSubscriber.punsubscribe("NonExistent");
+ waitFor(() -> mockSubscriber.punsubscribeInfos.size() == 1);
+
+ assertThat(mockSubscriber.punsubscribeInfos.get(0).channel).isEqualTo("NonExistent");
+ assertThat(mockSubscriber.punsubscribeInfos.get(0).count).isEqualTo(1);
+ assertThat(mockSubscriber.getSubscribedChannels()).isEqualTo(1);
+ } finally {
+ // now cleanup the actual subscription
+ mockSubscriber.punsubscribe("salutations");
+ waitFor(() -> !subscriberThread.isAlive());
+ }
+ }
+
+ @Test
public void testPublishBinaryData() {
byte[] expectedMessage = new byte[256];
for (int i = 0; i < 256; i++) {
@@ -117,6 +304,33 @@ public class PubSubIntegrationTest {
}
@Test
+ public void testSubscribeAndPublishUsingBinaryData() {
+ byte[] binaryBlob = new byte[256];
+ for (int i = 0; i < 256; i++) {
+ binaryBlob[i] = (byte) i;
+ }
+
+ MockBinarySubscriber mockSubscriber = new MockBinarySubscriber();
+
+ Runnable runnable = () -> {
+ subscriber.subscribe(mockSubscriber, binaryBlob);
+ };
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+
+ Long result = publisher.publish(binaryBlob, binaryBlob);
+ assertThat(result).isEqualTo(1);
+
+ mockSubscriber.unsubscribe(binaryBlob);
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ waitFor(() -> !subscriberThread.isAlive());
+
+ assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo(binaryBlob);
+ }
+
+ @Test
public void testOneSubscriberSubscribingToTwoChannels() {
List<String> expectedMessages = Arrays.asList("hello", "howdy");
MockSubscriber mockSubscriber = new MockSubscriber();
@@ -135,14 +349,111 @@ public class PubSubIntegrationTest {
assertThat(result).isEqualTo(1);
mockSubscriber.unsubscribe("salutations");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+ assertThat(mockSubscriber.unsubscribeInfos).hasSize(1);
+ assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
+ assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(1);
+ mockSubscriber.unsubscribeInfos.clear();
mockSubscriber.unsubscribe("yuletide");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ assertThat(mockSubscriber.unsubscribeInfos).hasSize(1);
+ assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("yuletide");
+ assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(0);
waitFor(() -> !subscriberThread.isAlive());
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(expectedMessages);
}
@Test
+ public void testSubscribingAndUnsubscribingFromMultipleChannels() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+
+ Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "salutations", "yuletide");
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
+
+ mockSubscriber.unsubscribe("yuletide", "salutations");
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ waitFor(() -> !subscriberThread.isAlive());
+
+ List<String> unsubscribedChannels = mockSubscriber.unsubscribeInfos.stream()
+ .map(x -> x.channel).collect(Collectors.toList());
+ assertThat(unsubscribedChannels).containsExactlyInAnyOrder("salutations", "yuletide");
+
+ List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream()
+ .map(x -> x.count).collect(Collectors.toList());
+ assertThat(channelCounts).containsExactlyInAnyOrder(1, 0);
+
+ }
+
+ @Test
+ public void testUnsubscribingImplicitlyFromAllChannels() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+
+ Runnable runnable = () -> subscriber.subscribe(mockSubscriber, "salutations", "yuletide");
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
+
+ mockSubscriber.unsubscribe();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ waitFor(() -> !subscriberThread.isAlive());
+
+ List<String> unsubscribedChannels = mockSubscriber.unsubscribeInfos.stream()
+ .map(x -> x.channel).collect(Collectors.toList());
+ assertThat(unsubscribedChannels).containsExactlyInAnyOrder("salutations", "yuletide");
+
+ List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream()
+ .map(x -> x.count).collect(Collectors.toList());
+ assertThat(channelCounts).containsExactlyInAnyOrder(1, 0);
+
+ Long result = publisher.publish("salutations", "greetings");
+ assertThat(result).isEqualTo(0);
+ }
+
+ @Test
+ public void testPsubscribingAndPunsubscribingFromMultipleChannels() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+
+ Runnable runnable = () -> subscriber.psubscribe(mockSubscriber, "sal*", "yul*");
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
+
+ mockSubscriber.punsubscribe("yul*", "sal*");
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ waitFor(() -> !subscriberThread.isAlive());
+ assertThat(mockSubscriber.punsubscribeInfos).containsExactly(
+ new MockSubscriber.UnsubscribeInfo("yul*", 1),
+ new MockSubscriber.UnsubscribeInfo("sal*", 0));
+ }
+
+ @Test
+ public void testPunsubscribingImplicitlyFromAllChannels() {
+ MockSubscriber mockSubscriber = new MockSubscriber();
+
+ Runnable runnable = () -> subscriber.psubscribe(mockSubscriber, "sal*", "yul*");
+
+ Thread subscriberThread = new Thread(runnable);
+ subscriberThread.start();
+
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 2);
+
+ mockSubscriber.punsubscribe();
+ waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
+ waitFor(() -> !subscriberThread.isAlive());
+ assertThat(mockSubscriber.punsubscribeInfos).containsExactly(
+ new MockSubscriber.UnsubscribeInfo("sal*", 1),
+ new MockSubscriber.UnsubscribeInfo("yul*", 0));
+ }
+
+ @Test
public void testTwoSubscribersOneChannel() {
Jedis subscriber2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
MockSubscriber mockSubscriber1 = new MockSubscriber();
@@ -164,12 +475,18 @@ public class PubSubIntegrationTest {
mockSubscriber1.unsubscribe("salutations");
waitFor(() -> mockSubscriber1.getSubscribedChannels() == 0);
waitFor(() -> !subscriber1Thread.isAlive());
+ assertThat(mockSubscriber1.unsubscribeInfos).hasSize(1);
+ assertThat(mockSubscriber1.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
+ assertThat(mockSubscriber1.unsubscribeInfos.get(0).count).isEqualTo(0);
result = publisher.publish("salutations", "goodbye");
assertThat(result).isEqualTo(1);
mockSubscriber2.unsubscribe("salutations");
waitFor(() -> mockSubscriber2.getSubscribedChannels() == 0);
waitFor(() -> !subscriber2Thread.isAlive());
+ assertThat(mockSubscriber2.unsubscribeInfos).hasSize(1);
+ assertThat(mockSubscriber2.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
+ assertThat(mockSubscriber2.unsubscribeInfos.get(0).count).isEqualTo(0);
assertThat(mockSubscriber1.getReceivedMessages()).isEqualTo(Collections.singletonList("hello"));
assertThat(mockSubscriber2.getReceivedMessages()).isEqualTo(Arrays.asList("hello", "goodbye"));
@@ -201,6 +518,8 @@ public class PubSubIntegrationTest {
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
+ assertThat(mockSubscriber.unsubscribeInfos)
+ .containsExactly(new MockSubscriber.UnsubscribeInfo("salutations", 0));
assertThat(mockSubscriber.getReceivedMessages()).isEqualTo(Collections.singletonList("hello"));
}
@@ -252,6 +571,8 @@ public class PubSubIntegrationTest {
mockSubscriber.punsubscribe("sal*s");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
+ assertThat(mockSubscriber.punsubscribeInfos)
+ .containsExactly(new MockSubscriber.UnsubscribeInfo("sal*s", 0));
}
@Test
@@ -283,6 +604,8 @@ public class PubSubIntegrationTest {
mockSubscriber.punsubscribe("sal*s");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
+ assertThat(mockSubscriber.punsubscribeInfos)
+ .containsExactly(new MockSubscriber.UnsubscribeInfo("sal*s", 0));
}
@Test
@@ -305,11 +628,17 @@ public class PubSubIntegrationTest {
mockSubscriber.punsubscribe("sal*s");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 1);
+ assertThat(mockSubscriber.punsubscribeInfos).hasSize(1);
+ assertThat(mockSubscriber.punsubscribeInfos.get(0).channel).isEqualTo("sal*s");
+ assertThat(mockSubscriber.punsubscribeInfos.get(0).count).isEqualTo(1);
mockSubscriber.unsubscribe("salutations");
waitFor(() -> mockSubscriber.getSubscribedChannels() == 0);
waitFor(() -> !subscriberThread.isAlive());
+ assertThat(mockSubscriber.unsubscribeInfos).hasSize(1);
+ assertThat(mockSubscriber.unsubscribeInfos.get(0).channel).isEqualTo("salutations");
+ assertThat(mockSubscriber.unsubscribeInfos.get(0).count).isEqualTo(0);
assertThat(mockSubscriber.getReceivedMessages()).containsExactly("hello");
assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello");
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java
index 52db20e..d3ba73b 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java
@@ -47,4 +47,9 @@ public class DummySubscription implements Subscription {
public List<Object> createResponse(String channel, byte[] message) {
return null;
}
+
+ @Override
+ public String getChannelName() {
+ return null;
+ }
}
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
index 5ba976e..e0ecb43 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
@@ -19,6 +19,7 @@ package org.apache.geode.redis.mocks;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import redis.clients.jedis.JedisPubSub;
@@ -34,6 +35,58 @@ public class MockSubscriber extends JedisPubSub {
return new ArrayList<>(receivedPMessages);
}
+ public final List<UnsubscribeInfo> unsubscribeInfos =
+ Collections.synchronizedList(new ArrayList<>());
+ public final List<UnsubscribeInfo> punsubscribeInfos =
+ Collections.synchronizedList(new ArrayList<>());
+
+ @Override
+ public void onUnsubscribe(String channel, int subscribedChannels) {
+ unsubscribeInfos.add(new UnsubscribeInfo(channel, subscribedChannels));
+ }
+
+ @Override
+ public void onPUnsubscribe(String pattern, int subscribedChannels) {
+ punsubscribeInfos.add(new UnsubscribeInfo(pattern, subscribedChannels));
+ }
+
+ public static class UnsubscribeInfo {
+ public final String channel;
+ public final int count;
+
+ public UnsubscribeInfo(String channel, int count) {
+ this.channel = channel;
+ this.count = count;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof UnsubscribeInfo)) {
+ return false;
+ }
+ UnsubscribeInfo that = (UnsubscribeInfo) o;
+ return count == that.count &&
+ Objects.equals(channel, that.channel);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(channel, count);
+ }
+
+ @Override
+ public String toString() {
+ return "UnsubscribeInfo{" +
+ "channel='" + channel + '\'' +
+ ", count=" + count +
+ '}';
+ }
+ }
+
+
@Override
public void onMessage(String channel, String message) {
receivedMessages.add(message);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
index 0b66d64..9a860bf 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
@@ -46,6 +46,7 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.timeout.WriteTimeoutHandler;
import io.netty.util.concurrent.Future;
+import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import org.apache.geode.annotations.Experimental;
@@ -416,7 +417,8 @@ public class GeodeRedisServer {
InternalDistributedSystem system = (InternalDistributedSystem) cache.getDistributedSystem();
String redisPassword = system.getConfig().getRedisPassword();
- final byte[] redisPasswordBytes = Coder.stringToBytes(redisPassword);
+ final byte[] redisPasswordBytes =
+ StringUtils.isBlank(redisPassword) ? null : Coder.stringToBytes(redisPassword);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup).channel(socketClass)
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index 4ec0ac6..2c7044b 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -164,9 +164,9 @@ public enum RedisCommandType {
SUBSCRIBE(new SubscribeExecutor(), SUPPORTED, new MinimumParameterRequirements(2)),
PUBLISH(new PublishExecutor(), SUPPORTED, new ExactParameterRequirements(3)),
- UNSUBSCRIBE(new UnsubscribeExecutor(), SUPPORTED, new MinimumParameterRequirements(2)),
+ UNSUBSCRIBE(new UnsubscribeExecutor(), SUPPORTED, new MinimumParameterRequirements(1)),
PSUBSCRIBE(new PsubscribeExecutor(), SUPPORTED, new MinimumParameterRequirements(2)),
- PUNSUBSCRIBE(new PunsubscribeExecutor(), SUPPORTED, new MinimumParameterRequirements(2)),
+ PUNSUBSCRIBE(new PunsubscribeExecutor(), SUPPORTED, new MinimumParameterRequirements(1)),
UNKNOWN(new UnknownExecutor(), SUPPORTED),
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
index 9b49127..b71ca68 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
@@ -17,10 +17,14 @@
package org.apache.geode.redis.internal.executor.pubsub;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger;
import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
import org.apache.geode.redis.internal.executor.AbstractExecutor;
import org.apache.geode.redis.internal.executor.GlobPattern;
import org.apache.geode.redis.internal.executor.RedisResponse;
@@ -33,17 +37,45 @@ public class PunsubscribeExecutor extends AbstractExecutor {
@Override
public RedisResponse executeCommand(Command command,
ExecutionHandlerContext context) {
- byte[] pattern = command.getProcessedCommand().get(1);
- long subscriptionCount =
- context
- .getPubSub()
- .punsubscribe(new GlobPattern(new String(pattern)), context.getClient());
-
- ArrayList<Object> items = new ArrayList<>();
- items.add("punsubscribe");
- items.add(pattern);
- items.add(subscriptionCount);
-
- return RedisResponse.array(items);
+
+ List<String> channelNames = extractChannelNames(command);
+ if (channelNames.isEmpty()) {
+ channelNames = context.getPubSub().findSubscribedChannels(context.getClient());
+ }
+
+ Collection<Collection<?>> response = punsubscribe(context, channelNames);
+
+ return RedisResponse.flattenedArray(response);
+ }
+
+ private List<String> extractChannelNames(Command command) {
+ return command.getProcessedCommandWrappers().stream()
+ .skip(1)
+ .map(ByteArrayWrapper::toString)
+ .collect(Collectors.toList());
+ }
+
+ private Collection<Collection<?>> punsubscribe(ExecutionHandlerContext context,
+ List<String> channelNames) {
+ Collection<Collection<?>> response = new ArrayList<>();
+
+ if (channelNames.isEmpty()) {
+ response.add(createItem(null, 0));
+ } else {
+ for (String channel : channelNames) {
+ long subscriptionCount =
+ context.getPubSub().punsubscribe(new GlobPattern(channel), context.getClient());
+ response.add(createItem(channel, subscriptionCount));
+ }
+ }
+ return response;
+ }
+
+ private ArrayList<Object> createItem(String channel, long subscriptionCount) {
+ ArrayList<Object> oneItem = new ArrayList<>();
+ oneItem.add("punsubscribe");
+ oneItem.add(channel);
+ oneItem.add(subscriptionCount);
+ return oneItem;
}
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/UnsubscribeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/UnsubscribeExecutor.java
index 853a29e..09d1a9d 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/UnsubscribeExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/UnsubscribeExecutor.java
@@ -16,34 +16,63 @@
package org.apache.geode.redis.internal.executor.pubsub;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import java.util.stream.Collectors;
-import org.apache.logging.log4j.Logger;
-
-import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
import org.apache.geode.redis.internal.executor.AbstractExecutor;
import org.apache.geode.redis.internal.executor.RedisResponse;
import org.apache.geode.redis.internal.netty.Command;
import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
public class UnsubscribeExecutor extends AbstractExecutor {
- private static final Logger logger = LogService.getLogger();
@Override
public RedisResponse executeCommand(Command command,
ExecutionHandlerContext context) {
- List<byte[]> commandElems = command.getProcessedCommand();
- byte[] channelName = commandElems.get(1);
- long subscriptionCount =
- context.getPubSub().unsubscribe(new String(channelName), context.getClient());
+ List<String> channelNames = extractChannelNames(command);
+ if (channelNames.isEmpty()) {
+ channelNames = context.getPubSub().findSubscribedChannels(context.getClient());
+ }
+
+ Collection<Collection<?>> response = unsubscribe(context, channelNames);
+
+ return RedisResponse.flattenedArray(response);
+ }
+
+ private List<String> extractChannelNames(Command command) {
+ return command.getProcessedCommandWrappers().stream()
+ .skip(1)
+ .map(ByteArrayWrapper::toString)
+ .collect(Collectors.toList());
+ }
+
+ private Collection<Collection<?>> unsubscribe(ExecutionHandlerContext context,
+ List<String> channelNames) {
+ Collection<Collection<?>> response = new ArrayList<>();
+
+ if (channelNames.isEmpty()) {
+ response.add(createItem(null, 0));
+ } else {
+ for (String channel : channelNames) {
+ long subscriptionCount =
+ context.getPubSub().unsubscribe(channel, context.getClient());
- ArrayList<Object> items = new ArrayList<>();
- items.add("unsubscribe");
- items.add(channelName);
- items.add(subscriptionCount);
+ response.add(createItem(channel, subscriptionCount));
+ }
+ }
+
+ return response;
+ }
- return RedisResponse.array(items);
+ private ArrayList<Object> createItem(String channelName, long subscriptionCount) {
+ ArrayList<Object> oneItem = new ArrayList<>();
+ oneItem.add("unsubscribe");
+ oneItem.add(channelName);
+ oneItem.add(subscriptionCount);
+ return oneItem;
}
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
index 0f02a7c..6c06c08 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
@@ -352,7 +352,7 @@ public class Coder {
}
public static byte[] stringToBytes(String string) {
- if (string == null || string.equals("")) {
+ if (string == null) {
return null;
}
try {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 297abb3..7ce9528 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -182,6 +182,13 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
return;
}
+ if (command.isOfType(RedisCommandType.SELECT)
+ || command.isOfType(RedisCommandType.CONFIG)
+ || command.isOfType(RedisCommandType.PUBSUB)) {
+ writeToChannel(RedisResponse.ok());
+ return;
+ }
+
if (command.isUnsupported() && !allowUnsupportedCommands()) {
writeToChannel(
RedisResponse.error(command.getCommandType() + RedisConstants.ERROR_UNSUPPORTED_COMMAND));
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java
index dcf2e50..9e0e4b8 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java
@@ -53,4 +53,8 @@ class ChannelSubscription extends AbstractSubscription {
return this.channel.equals(channel);
}
+ @Override
+ public String getChannelName() {
+ return channel;
+ }
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java
index d19c870..2fbcdfe 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java
@@ -54,4 +54,8 @@ class PatternSubscription extends AbstractSubscription {
return pattern.matches(channel);
}
+ @Override
+ public String getChannelName() {
+ return pattern.globPattern();
+ }
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
index 22b9e40..69aebc4 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
@@ -16,6 +16,8 @@
package org.apache.geode.redis.internal.pubsub;
+import java.util.List;
+
import org.apache.geode.cache.Region;
import org.apache.geode.redis.internal.data.ByteArrayWrapper;
import org.apache.geode.redis.internal.data.RedisData;
@@ -77,4 +79,13 @@ public interface PubSub {
* @return the number of channels still subscribed to by the client
*/
long punsubscribe(GlobPattern pattern, Client client);
+
+ /**
+ * Return a list of channel names that a client has subscribed to
+ *
+ * @param client the Client which is to be queried
+ * @return the list of channels
+ */
+ List<String> findSubscribedChannels(Client client);
+
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
index 71deb07..d6b5788 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
@@ -19,6 +19,7 @@ package org.apache.geode.redis.internal.pubsub;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger;
@@ -86,23 +87,12 @@ public class PubSubImpl implements PubSub {
@Override
public long subscribe(String channel, ExecutionHandlerContext context, Client client) {
- if (subscriptions.exists(channel, client)) {
- return subscriptions.findSubscriptions(client).size();
- }
- Subscription subscription = new ChannelSubscription(client, channel, context);
- subscriptions.add(subscription);
- return subscriptions.findSubscriptions(client).size();
+ return subscriptions.subscribe(channel, context, client);
}
@Override
public long psubscribe(GlobPattern pattern, ExecutionHandlerContext context, Client client) {
- if (subscriptions.exists(pattern, client)) {
- return subscriptions.findSubscriptions(client).size();
- }
- Subscription subscription = new PatternSubscription(client, pattern, context);
- subscriptions.add(subscription);
-
- return subscriptions.findSubscriptions(client).size();
+ return subscriptions.psubscribe(pattern, context, client);
}
private void registerPublishFunction() {
@@ -135,14 +125,19 @@ public class PubSubImpl implements PubSub {
@Override
public long unsubscribe(String channel, Client client) {
- subscriptions.remove(channel, client);
- return subscriptions.findSubscriptions(client).size();
+ return subscriptions.unsubscribe(channel, client);
}
@Override
public long punsubscribe(GlobPattern pattern, Client client) {
- subscriptions.remove(pattern, client);
- return subscriptions.findSubscriptions(client).size();
+ return subscriptions.unsubscribe(pattern, client);
+ }
+
+ @Override
+ public List<String> findSubscribedChannels(Client client) {
+ return subscriptions.findSubscriptions(client).stream()
+ .map(Subscription::getChannelName)
+ .collect(Collectors.toList());
}
@VisibleForTesting
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java
index 4490917..7695a17 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java
@@ -51,4 +51,10 @@ public interface Subscription {
* The response dependent on the type of the subscription
*/
List<Object> createResponse(String channel, byte[] message);
+
+ /**
+ * Return the subscription name. In the case of a pattern the string representation of the
+ * pattern is returned.
+ */
+ String getChannelName();
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
index fc73a79..8f7e732 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
@@ -20,7 +20,10 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.redis.internal.executor.GlobPattern;
import org.apache.geode.redis.internal.netty.Client;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
/**
* Class that manages both channel and pattern subscriptions.
@@ -34,7 +37,8 @@ public class Subscriptions {
* @param channelOrPattern channel or pattern
* @param client a client connection
*/
- public boolean exists(Object channelOrPattern, Client client) {
+ @VisibleForTesting
+ boolean exists(Object channelOrPattern, Client client) {
return subscriptions.stream()
.anyMatch(subscription -> subscription.isEqualTo(channelOrPattern, client));
}
@@ -66,7 +70,8 @@ public class Subscriptions {
/**
* Add a new subscription
*/
- public void add(Subscription subscription) {
+ @VisibleForTesting
+ void add(Subscription subscription) {
subscriptions.add(subscription);
}
@@ -80,14 +85,38 @@ public class Subscriptions {
/**
* Remove a single subscription
*/
- public void remove(Object channel, Client client) {
- subscriptions.removeIf(subscription -> subscription.isEqualTo(channel, client));
+ @VisibleForTesting
+ boolean remove(Object channel, Client client) {
+ return subscriptions.removeIf(subscription -> subscription.isEqualTo(channel, client));
}
/**
* @return the total number of all local subscriptions
*/
- public int size() {
+ @VisibleForTesting
+ int size() {
return subscriptions.size();
}
+
+ public synchronized long subscribe(String channel, ExecutionHandlerContext context,
+ Client client) {
+ if (!exists(channel, client)) {
+ add(new ChannelSubscription(client, channel, context));
+ }
+ return findSubscriptions(client).size();
+ }
+
+ public synchronized long psubscribe(GlobPattern pattern, ExecutionHandlerContext context,
+ Client client) {
+ if (!exists(pattern, client)) {
+ add(new PatternSubscription(client, pattern, context));
+ }
+ return findSubscriptions(client).size();
+ }
+
+ public synchronized long unsubscribe(Object channelOrPattern, Client client) {
+ remove(channelOrPattern, client);
+ return findSubscriptions(client).size();
+ }
+
}
diff --git a/geode-redis/src/test/resources/expected-pom.xml b/geode-redis/src/test/resources/expected-pom.xml
index b3f8d0b..c17b109 100644
--- a/geode-redis/src/test/resources/expected-pom.xml
+++ b/geode-redis/src/test/resources/expected-pom.xml
@@ -134,5 +134,16 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <scope>runtime</scope>
+ <exclusions>
+ <exclusion>
+ <artifactId>spring-boot-starter-logging</artifactId>
+ <groupId>org.springframework.boot</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</project>