You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by on...@apache.org on 2020/04/10 06:46:08 UTC

[geode] branch develop updated: GEODE-7943 add synchronization to Subscriptions class (#4913)

This is an automated email from the ASF dual-hosted git repository.

onichols pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 51477e9  GEODE-7943 add synchronization to Subscriptions class (#4913)
51477e9 is described below

commit 51477e91cd65a14e9396fb7da2876cd8bf2b7dee
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Thu Apr 9 23:45:41 2020 -0700

    GEODE-7943 add synchronization to Subscriptions class (#4913)
    
    * Add synchronized to size()
    * Move test to integrationTest
    * Switch to CopyOnWriteList (it appears not to really introduce memory issues)
    * Fix a small cleanup issue in ExpireAtIntegrationTest
    
    Authored-by: Jens Deppe <jd...@vmware.com>
---
 .../redis/general/ExpireAtIntegrationTest.java     |  17 +-
 .../geode/redis/internal/DummySubscription.java    |  47 +++++
 .../internal/SubscriptionsIntegrationTest.java     | 194 +++++++++++++++++++++
 .../apache/geode/redis/internal/Subscriptions.java |  19 +-
 4 files changed, 268 insertions(+), 9 deletions(-)

diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExpireAtIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExpireAtIntegrationTest.java
index 5d80d31..9ea5dd4 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExpireAtIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExpireAtIntegrationTest.java
@@ -15,6 +15,9 @@
 
 package org.apache.geode.redis.general;
 
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import org.junit.After;
@@ -24,6 +27,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import redis.clients.jedis.Jedis;
 
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.GemFireCache;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.redis.GeodeRedisServer;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
@@ -35,13 +40,19 @@ public class ExpireAtIntegrationTest {
   private static GeodeRedisServer server;
   private long unixTimeStampInTheFutureInSeconds;
   private long unixTimeStampFromThePast = 0L;
-  String key = "key";
-  String value = "value";
+  private static String key = "key";
+  private static String value = "value";
+  private static GemFireCache cache;
 
   @BeforeClass
   public static void setUp() {
     int port = AvailablePortHelper.getRandomAvailableTCPPort();
 
+    CacheFactory cf = new CacheFactory();
+    cf.set(LOG_LEVEL, "error");
+    cf.set(MCAST_PORT, "0");
+    cf.set(LOCATORS, "");
+    cache = cf.create();
     server = new GeodeRedisServer("localhost", port);
     server.start();
     jedis = new Jedis("localhost", port, REDIS_CLIENT_TIMEOUT);
@@ -52,7 +63,6 @@ public class ExpireAtIntegrationTest {
     unixTimeStampInTheFutureInSeconds = (System.currentTimeMillis() / 1000) + 60;
   }
 
-
   @After
   public void testLevelTearDown() {
     jedis.flushAll();
@@ -61,6 +71,7 @@ public class ExpireAtIntegrationTest {
   @AfterClass
   public static void classLevelTearDown() {
     jedis.close();
+    cache.close();
     server.shutdown();
   }
 
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/DummySubscription.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/DummySubscription.java
new file mode 100644
index 0000000..8868ad4
--- /dev/null
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/DummySubscription.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal;
+
+import java.util.List;
+
+public class DummySubscription implements Subscription {
+
+  @Override
+  public boolean isEqualTo(Object channelOrPattern, Client client) {
+    return false;
+  }
+
+  @Override
+  public PublishResult publishMessage(String channel, byte[] message) {
+    return null;
+  }
+
+  @Override
+  public boolean matchesClient(Client client) {
+    return false;
+  }
+
+  @Override
+  public boolean matches(String channel) {
+    return false;
+  }
+
+  @Override
+  public List<Object> createResponse(String channel, byte[] message) {
+    return null;
+  }
+}
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/SubscriptionsIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/SubscriptionsIntegrationTest.java
new file mode 100644
index 0000000..dccebfc
--- /dev/null
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/SubscriptionsIntegrationTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.redis.internal;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+
+import io.netty.channel.Channel;
+import org.junit.AfterClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class SubscriptionsIntegrationTest {
+
+  private static final int ITERATIONS = 1000;
+
+  @ClassRule
+  public static ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  private Callable<Void> functionSpinner(Consumer<Void> consumer) {
+    return () -> {
+      for (int i = 0; i < ITERATIONS; i++) {
+        consumer.accept(null);
+        Thread.yield();
+      }
+      return null;
+    };
+  }
+
+  @AfterClass
+  public static void after() {
+    System.out.println("done");
+  }
+
+  @Test
+  public void add_doesNotThrowException_whenListIsConcurrentlyModified()
+      throws Exception {
+    final Subscriptions subscriptions = new Subscriptions();
+
+    Callable<Void> addingCallable1 =
+        functionSpinner(x -> subscriptions.add(new DummySubscription()));
+    Callable<Void> addingCallable2 =
+        functionSpinner(x -> subscriptions.add(new DummySubscription()));
+
+    Future<Void> addingFuture = executor.submit(addingCallable1);
+    Future<Void> existsFuture = executor.submit(addingCallable2);
+
+    addingFuture.get();
+    existsFuture.get();
+
+    assertThat(subscriptions.size()).isEqualTo(ITERATIONS * 2);
+  }
+
+  @Test
+  public void exists_doesNotThrowException_whenListIsConcurrentlyModified()
+      throws Exception {
+    final Subscriptions subscriptions = new Subscriptions();
+
+    Callable<Void> addingCallable =
+        functionSpinner(x -> subscriptions.add(new DummySubscription()));
+    Callable<Void> existsCallable =
+        functionSpinner(x -> subscriptions.exists("channel", mock(Client.class)));
+
+    Future<Void> addingFuture = executor.submit(addingCallable);
+    Future<Void> existsFuture = executor.submit(existsCallable);
+
+    addingFuture.get();
+    existsFuture.get();
+
+    assertThat(subscriptions.size()).isEqualTo(ITERATIONS);
+  }
+
+  @Test
+  public void findSubscriptionsByClient_doesNotThrowException_whenListIsConcurrentlyModified()
+      throws Exception {
+    final Subscriptions subscriptions = new Subscriptions();
+
+    Callable<Void> addingCallable =
+        functionSpinner(x -> subscriptions.add(new DummySubscription()));
+    Callable<Void> findSubscriptionsCallable =
+        functionSpinner(x -> subscriptions.findSubscriptions(mock(Client.class)));
+
+    Future<Void> addingFuture = executor.submit(addingCallable);
+    Future<Void> existsFuture = executor.submit(findSubscriptionsCallable);
+
+    addingFuture.get();
+    existsFuture.get();
+
+    assertThat(subscriptions.size()).isEqualTo(ITERATIONS);
+  }
+
+  @Test
+  public void findSubscriptionsByChannel_doesNotThrowException_whenListIsConcurrentlyModified()
+      throws Exception {
+    final Subscriptions subscriptions = new Subscriptions();
+
+    Callable<Void> addingCallable =
+        functionSpinner(x -> subscriptions.add(new DummySubscription()));
+    Callable<Void> findSubscriptionsCallable =
+        functionSpinner(x -> subscriptions.findSubscriptions("channel"));
+
+    Future<Void> addingFuture = executor.submit(addingCallable);
+    Future<Void> existsFuture = executor.submit(findSubscriptionsCallable);
+
+    addingFuture.get();
+    existsFuture.get();
+
+    assertThat(subscriptions.size()).isEqualTo(ITERATIONS);
+  }
+
+  @Test
+  public void removeByClient_doesNotThrowException_whenListIsConcurrentlyModified()
+      throws Exception {
+    final Subscriptions subscriptions = new Subscriptions();
+
+    List<Client> clients = new LinkedList<>();
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    for (int i = 0; i < ITERATIONS; i++) {
+      Client client = new Client(mock(Channel.class));
+      clients.add(client);
+      subscriptions.add(new ChannelSubscription(client, "channel", context));
+    }
+
+    Callable<Void> removeCallable = () -> {
+      clients.forEach(c -> subscriptions.remove(c));
+      return null;
+    };
+    Callable<Void> existsCallable = () -> {
+      clients.forEach(c -> subscriptions.exists("channel", c));
+      return null;
+    };
+
+    Future<Void> removeFuture = executor.submit(removeCallable);
+    Future<Void> existsFuture = executor.submit(existsCallable);
+
+    removeFuture.get();
+    existsFuture.get();
+
+    assertThat(subscriptions.size()).isEqualTo(0);
+  }
+
+  @Test
+  public void removeByChannelAndClient_doesNotThrowException_whenListIsConcurrentlyModified()
+      throws Exception {
+    final Subscriptions subscriptions = new Subscriptions();
+
+    List<Client> clients = new LinkedList<>();
+    ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
+    for (int i = 0; i < ITERATIONS; i++) {
+      Client client = new Client(mock(Channel.class));
+      clients.add(client);
+      subscriptions.add(new ChannelSubscription(client, "channel", context));
+    }
+
+    Callable<Void> removeCallable = () -> {
+      clients.forEach(c -> subscriptions.remove("channel", c));
+      return null;
+    };
+    Callable<Void> existsCallable = () -> {
+      clients.forEach(c -> subscriptions.exists("channel", c));
+      return null;
+    };
+
+    Future<Void> removeFuture = executor.submit(removeCallable);
+    Future<Void> existsFuture = executor.submit(existsCallable);
+
+    removeFuture.get();
+    existsFuture.get();
+
+    assertThat(subscriptions.size()).isEqualTo(0);
+  }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java
index b66770a..c24b450 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/Subscriptions.java
@@ -16,15 +16,15 @@
 
 package org.apache.geode.redis.internal;
 
-import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.stream.Collectors;
 
 /**
  * Class that manages both channel and pattern subscriptions.
  */
 public class Subscriptions {
-  private List<Subscription> subscriptions = new ArrayList<>();
+  private final List<Subscription> subscriptions = new CopyOnWriteArrayList<>();
 
   /**
    * Check whether a given client has already subscribed to a channel or pattern
@@ -56,7 +56,7 @@ public class Subscriptions {
    * @return a list of subscriptions
    */
   public List<Subscription> findSubscriptions(String channelOrPattern) {
-    return this.subscriptions.stream()
+    return subscriptions.stream()
         .filter(subscription -> subscription.matches(channelOrPattern))
         .collect(Collectors.toList());
   }
@@ -65,20 +65,27 @@ public class Subscriptions {
    * Add a new subscription
    */
   public void add(Subscription subscription) {
-    this.subscriptions.add(subscription);
+    subscriptions.add(subscription);
   }
 
   /**
    * Remove all subscriptions for a given client
    */
   public void remove(Client client) {
-    this.subscriptions.removeIf(subscription -> subscription.matchesClient(client));
+    subscriptions.removeIf(subscription -> subscription.matchesClient(client));
   }
 
   /**
    * Remove a single subscription
    */
   public void remove(Object channel, Client client) {
-    this.subscriptions.removeIf(subscription -> subscription.isEqualTo(channel, client));
+    subscriptions.removeIf(subscription -> subscription.isEqualTo(channel, client));
+  }
+
+  /**
+   * @return the total number of all local subscriptions
+   */
+  public int size() {
+    return subscriptions.size();
   }
 }