You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by on...@apache.org on 2020/04/10 06:46:08 UTC
[geode] branch develop updated: GEODE-7943 add synchronization to
Subscriptions class (#4913)
This is an automated email from the ASF dual-hosted git repository.
onichols pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 51477e9 GEODE-7943 add synchronization to Subscriptions class (#4913)
51477e9 is described below
commit 51477e91cd65a14e9396fb7da2876cd8bf2b7dee
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Thu Apr 9 23:45:41 2020 -0700
GEODE-7943 add synchronization to Subscriptions class (#4913)
* Add synchronized to size()
* Move test to integrationTest
* Switch to CopyOnWriteList (it appears not to really introduce memory issues)
* Fix a small cleanup issue in ExpireAtIntegrationTest
Authored-by: Jens Deppe <jd...@vmware.com>
---
.../redis/general/ExpireAtIntegrationTest.java | 17 +-
.../geode/redis/internal/DummySubscription.java | 47 +++++
.../internal/SubscriptionsIntegrationTest.java | 194 +++++++++++++++++++++
.../apache/geode/redis/internal/Subscriptions.java | 19 +-
4 files changed, 268 insertions(+), 9 deletions(-)
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExpireAtIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExpireAtIntegrationTest.java
index 5d80d31..9ea5dd4 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExpireAtIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExpireAtIntegrationTest.java
@@ -15,6 +15,9 @@
package org.apache.geode.redis.general;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.assertj.core.api.Assertions.assertThat;
import org.junit.After;
@@ -24,6 +27,8 @@ import org.junit.BeforeClass;
import org.junit.Test;
import redis.clients.jedis.Jedis;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.GemFireCache;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.redis.GeodeRedisServer;
import org.apache.geode.test.awaitility.GeodeAwaitility;
@@ -35,13 +40,19 @@ public class ExpireAtIntegrationTest {
private static GeodeRedisServer server;
private long unixTimeStampInTheFutureInSeconds;
private long unixTimeStampFromThePast = 0L;
- String key = "key";
- String value = "value";
+ private static String key = "key";
+ private static String value = "value";
+ private static GemFireCache cache;
@BeforeClass
public static void setUp() {
int port = AvailablePortHelper.getRandomAvailableTCPPort();
+ CacheFactory cf = new CacheFactory();
+ cf.set(LOG_LEVEL, "error");
+ cf.set(MCAST_PORT, "0");
+ cf.set(LOCATORS, "");
+ cache = cf.create();
server = new GeodeRedisServer("localhost", port);
server.start();
jedis = new Jedis("localhost", port, REDIS_CLIENT_TIMEOUT);
@@ -52,7 +63,6 @@ public class ExpireAtIntegrationTest {
unixTimeStampInTheFutureInSeconds = (System.currentTimeMillis() / 1000) + 60;
}
-
@After
public void testLevelTearDown() {
jedis.flushAll();
@@ -61,6 +71,7 @@ public class ExpireAtIntegrationTest {
@AfterClass
public static void classLevelTearDown() {
jedis.close();
+ cache.close();
server.shutdown();
}
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/DummySubscription.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/DummySubscription.java
new file mode 100644
index 0000000..8868ad4
--- /dev/null
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/DummySubscription.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal;
+
+import java.util.List;
+
+public class DummySubscription implements Subscription {
+
+ @Override
+ public boolean isEqualTo(Object channelOrPattern, Client client) {
+ return false;
+ }
+
+ @Override
+ public PublishResult publishMessage(String channel, byte[] message) {
+ return null;
+ }
+
+ @Override
+ public boolean matchesClient(Client client) {
+ return false;
+ }
+
+ @Override
+ public boolean matches(String channel) {
+ return false;
+ }
+
+ @Override
+ public List<Object> createResponse(String channel, byte[] message) {
+ return null;
+ }
+}
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/SubscriptionsIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/SubscriptionsIntegrationTest.java
new file mode 100644
index 0000000..dccebfc
--- /dev/null
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/SubscriptionsIntegrationTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+import io.netty.channel.Channel;
+import org.junit.AfterClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class SubscriptionsIntegrationTest {
+
+ private static final int ITERATIONS = 1000;
+
+ @ClassRule
+ public static ExecutorServiceRule executor = new ExecutorServiceRule();
+
+ private Callable<Void> functionSpinner(Consumer<Void> consumer) {
+ return () -> {
+ for (int i = 0; i < ITERATIONS; i++) {
+ consumer.accept(null);
+ Thread.yield();
+ }
+ return null;
+ };
+ }
+
+ @AfterClass
+ public static void after() {
+ System.out.println("done");
+ }
+
+ @Test
+ public void add_doesNotThrowException_whenListIsConcurrentlyModified()
+ throws Exception {
+ final Subscriptions subscriptions = new Subscriptions();
+
+ Callable<Void> addingCallable1 =
+ functionSpinner(x -> subscriptions.add(new DummySubscription()));
+ Callable<Void> addingCallable2 =
+ functionSpinner(x -> subscriptions.add(new DummySubscription()));
+
+ Future<Void> addingFuture = executor.submit(addingCallable1);
+ Future<Void> existsFuture = executor.submit(addingCallable2);
+
+ addingFuture.get();
+ existsFuture.get();
+
+ assertThat(subscriptions.size()).isEqualTo(ITERATIONS * 2);
+ }
+
+ @Test
+ public void exists_doesNotThrowException_whenListIsConcurrentlyModified()
+ throws Exception {
+ final Subscriptions subscriptions = new Subscriptions();
+
+ Callable<Void> addingCallable =
+ functionSpinner(x -> subscriptions.add(new DummySubscription()));
+ Callable<Void> existsCallable =
+ functionSpinner(x -> subscriptions.exists("channel", mock(Client.class)));
+
+ Future<Void> addingFuture = executor.submit(addingCallable);
+ Future<Void> existsFuture = executor.submit(existsCallable);
+
+ addingFuture.get();
+ existsFuture.get();
+
+ assertThat(subscriptions.size()).isEqualTo(ITERATIONS);
+ }
+
+ @Test
+ public void findSubscriptionsByClient_doesNotThrowException_whenListIsConcurrentlyModified()
+ throws Exception {
+ final Subscriptions subscriptions = new Subscriptions();
+
+ Callable<Void> addingCallable =
+ functionSpinner(x -> subscriptions.add(new DummySubscription()));
+ Callable<Void> findSubscriptionsCallable =
+ functionSpinner(x -> subscriptions.findSubscriptions(mock(Client.class)));
+
+ Future<Void> addingFuture = executor.submit(addingCallable);
+ Future<Void> existsFuture = executor.submit(findSubscriptionsCallable);
+
+ addingFuture.get();
+ existsFuture.get();
+
+ assertThat(subscriptions.size()).isEqualTo(ITERATIONS);
+ }
+
+ @Test
+ public void findSubscriptionsByChannel_doesNotThrowException_whenListIsConcurrentlyModified()
+ throws Exception {
+ final Subscriptions subscriptions = new Subscriptions();
+
+ Callable<Void> addingCallable =
+ functionSpinner(x -> subscriptions.add(new DummySubscription()));
+ Callable<Void> findSubscriptionsCallable =
+ functionSpinner(x -> subscriptions.findSubscriptions("channel"));
+
+ Future<Void> addingFuture = executor.submit(addingCallable);
+ Future<Void> existsFuture = executor.submit(findSubscriptionsCallable);
+
+ addingFuture.get();
+ existsFuture.get();
+
+ assertThat(subscriptions.size()).isEqualTo(ITERATIONS);
+ }
+
+ @Test
+ public void removeByClient_doesNotThrowException_whenListIsConcurrentlyModified()
+ throws Exception {
+ final Subscriptions subscriptions = new Subscriptions();
+
+ List<Client> clients = new LinkedList<>();
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ for (int i = 0; i < ITERATIONS; i++) {
+ Client client = new Client(mock(Channel.class));
+ clients.add(client);
+ subscriptions.add(new ChannelSubscription(client, "channel", context));
+ }
+
+ Callable<Void> removeCallable = () -> {
+ clients.forEach(c -> subscriptions.remove(c));
+ return null;
+ };
+ Callable<Void> existsCallable = () -> {
+ clients.forEach(c -> subscriptions.exists("channel", c));
+ return null;
+ };
+
+ Future<Void> removeFuture = executor.submit(removeCallable);
+ Future<Void> existsFuture = executor.submit(existsCallable);
+
+ removeFuture.get();
+ existsFuture.get();
+
+ assertThat(subscriptions.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void removeByChannelAndClient_doesNotThrowException_whenListIsConcurrentlyModified()
+ throws Exception {
+ final Subscriptions subscriptions = new Subscriptions();
+
+ List<Client> clients = new LinkedList<>();
+ ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+ for (int i = 0; i < ITERATIONS; i++) {
+ Client client = new Client(mock(Channel.class));
+ clients.add(client);
+ subscriptions.add(new ChannelSubscription(client, "channel", context));
+ }
+
+ Callable<Void> removeCallable = () -> {
+ clients.forEach(c -> subscriptions.remove("channel", c));
+ return null;
+ };
+ Callable<Void> existsCallable = () -> {
+ clients.forEach(c -> subscriptions.exists("channel", c));
+ return null;
+ };
+
+ Future<Void> removeFuture = executor.submit(removeCallable);
+ Future<Void> existsFuture = executor.submit(existsCallable);
+
+ removeFuture.get();
+ existsFuture.get();
+
+ assertThat(subscriptions.size()).isEqualTo(0);
+ }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java
index b66770a..c24b450 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java
@@ -16,15 +16,15 @@
package org.apache.geode.redis.internal;
-import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
/**
* Class that manages both channel and pattern subscriptions.
*/
public class Subscriptions {
- private List<Subscription> subscriptions = new ArrayList<>();
+ private final List<Subscription> subscriptions = new CopyOnWriteArrayList<>();
/**
* Check whether a given client has already subscribed to a channel or pattern
@@ -56,7 +56,7 @@ public class Subscriptions {
* @return a list of subscriptions
*/
public List<Subscription> findSubscriptions(String channelOrPattern) {
- return this.subscriptions.stream()
+ return subscriptions.stream()
.filter(subscription -> subscription.matches(channelOrPattern))
.collect(Collectors.toList());
}
@@ -65,20 +65,27 @@ public class Subscriptions {
* Add a new subscription
*/
public void add(Subscription subscription) {
- this.subscriptions.add(subscription);
+ subscriptions.add(subscription);
}
/**
* Remove all subscriptions for a given client
*/
public void remove(Client client) {
- this.subscriptions.removeIf(subscription -> subscription.matchesClient(client));
+ subscriptions.removeIf(subscription -> subscription.matchesClient(client));
}
/**
* Remove a single subscription
*/
public void remove(Object channel, Client client) {
- this.subscriptions.removeIf(subscription -> subscription.isEqualTo(channel, client));
+ subscriptions.removeIf(subscription -> subscription.isEqualTo(channel, client));
+ }
+
+ /**
+ * @return the total number of all local subscriptions
+ */
+ public int size() {
+ return subscriptions.size();
}
}