You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2021/09/03 20:13:19 UTC

[GitHub] [geode] DonalEvans commented on a change in pull request #6814: GEODE-9347: optimize pubSub

DonalEvans commented on a change in pull request #6814:
URL: https://github.com/apache/geode/pull/6814#discussion_r702061394



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/AbstractClientSubscriptionManager.java
##########
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.geode.redis.internal.pubsub;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+
+import org.apache.geode.redis.internal.netty.Client;
+
+abstract class AbstractClientSubscriptionManager<S extends Subscription>
+    implements ClientSubscriptionManager<S> {
+  private final Map<Client, S> map = new ConcurrentHashMap<>();

Review comment:
       Could this map be named something more descriptive?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ClientSubscriptionManager.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.geode.redis.internal.pubsub;
+
+import java.util.function.Consumer;
+
+import org.apache.geode.redis.internal.netty.Client;
+
+/**
+ * An instance of this interface keeps track of all the clients
+ * that have active subscriptions to the channel or pattern that
+ * this instance represents.
+ */
+interface ClientSubscriptionManager<S> {

Review comment:
       Some typos in this class. "Manager's" should be "Managers" throughout.

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/pubsub/SubscriptionsIntegrationTest.java
##########
@@ -53,49 +54,53 @@
     };
   }
 
+  final Subscriptions subscriptions = new Subscriptions();
+
+  private int dummyCount;

Review comment:
       Would it be worth resetting this value in a `@Before` method?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PunsubscribeExecutor.java
##########
@@ -16,61 +16,56 @@
 
 package org.apache.geode.redis.internal.executor.pubsub;
 
-import static org.apache.geode.redis.internal.netty.Coder.bytesToString;
-import static org.apache.geode.redis.internal.pubsub.Subscription.Type.PATTERN;
+import static java.util.Collections.singletonList;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bPUNSUBSCRIBE;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
-import org.apache.geode.redis.internal.executor.GlobPattern;
 import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Client;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
 
 public class PunsubscribeExecutor extends AbstractExecutor {
 
   @Override
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
-
     List<byte[]> patternNames = extractPatternNames(command);
-    if (patternNames.isEmpty()) {
-      patternNames = context.getPubSub().findSubscriptionNames(context.getClient(), PATTERN);
-    }
-
     Collection<Collection<?>> response = punsubscribe(context, patternNames);
-
     return RedisResponse.flattenedArray(response);
   }
 
   private List<byte[]> extractPatternNames(Command command) {
     return command.getProcessedCommand().stream().skip(1).collect(Collectors.toList());
   }
 
+  private static final Collection<Collection<?>> EMPTY_RESULT = singletonList(createItem(null, 0));

Review comment:
       Could this constant be moved to the top of the class?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/AbstractSubscription.java
##########
@@ -26,33 +28,34 @@
 import org.apache.geode.redis.internal.netty.Client;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
 
+
+
 public abstract class AbstractSubscription implements Subscription {
   private static final Logger logger = LogService.getLogger();
-  private final Client client;
-  private final ExecutionHandlerContext context;
 
+  private final ExecutionHandlerContext context;
+  private final byte[] subscriptionName;
   // Before we are ready to publish we need to make sure that the response to the
   // SUBSCRIBE command has been sent back to the client.
   private final CountDownLatch readyForPublish = new CountDownLatch(1);
-  private final Subscriptions subscriptions;
   private boolean running = true;
 
-  AbstractSubscription(Client client, ExecutionHandlerContext context,
-      Subscriptions subscriptions) {
-    if (client == null) {
-      throw new IllegalArgumentException("client cannot be null");
-    }
+  AbstractSubscription(ExecutionHandlerContext context,
+      Subscriptions subscriptions, byte[] subscriptionName) {
     if (context == null) {
       throw new IllegalArgumentException("context cannot be null");
     }
     if (subscriptions == null) {
       throw new IllegalArgumentException("subscriptions cannot be null");
     }
-    this.client = client;
+    if (subscriptionName == null) {
+      throw new IllegalArgumentException("subscriptionName cannot be null");
+    }

Review comment:
       Can any of these ever actually be null here? It seems reasonable to assume that they shouldn't ever be, given that they're not something the user is passing in (except `subscriptionName` which should be checked for null in the executor). Maybe this would be somewhere that `@NotNull` annotations in the method signature would be useful?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/AbstractSubscriptionManager.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.geode.redis.internal.pubsub;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.geode.redis.internal.executor.GlobPattern;
+import org.apache.geode.redis.internal.netty.Client;
+
+abstract class AbstractSubscriptionManager<S extends Subscription>
+    implements SubscriptionManager<S> {
+  protected final Map<SubscriptionId, ClientSubscriptionManager<S>> map =

Review comment:
       Could this have a more descriptive name?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/StringBytesGlossary.java
##########
@@ -259,4 +260,17 @@
   public static final byte[] bLEAST_MEMBER_NAME = new byte[] {-2};
 
   public static final byte[] bNEGATIVE_ZERO = stringToBytes("-0");
+
+  @Immutable
+  public static final byte[] bMESSAGE = Coder.stringToBytes("message");
+
+  @Immutable
+  public static final byte[] bPMESSAGE = Coder.stringToBytes("pmessage");
+
+  @Immutable
+  public static final byte[] bUNSUBSCRIBE = Coder.stringToBytes("unsubscribe");
+
+  @Immutable
+  public static final byte[] bPUNSUBSCRIBE = Coder.stringToBytes("punsubscribe");

Review comment:
       For consistency with other constants in this class, could the stings here be all uppercase, and could these be moved up to the "Redis Command constants" section of the file?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/AbstractSubscription.java
##########
@@ -26,33 +28,34 @@
 import org.apache.geode.redis.internal.netty.Client;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
 
+
+
 public abstract class AbstractSubscription implements Subscription {
   private static final Logger logger = LogService.getLogger();

Review comment:
       This logger is never used so can probably be removed.

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/pubsub/SubscriptionsIntegrationTest.java
##########
@@ -53,49 +54,53 @@
     };

Review comment:
       The `functionSpinner()` callable above this line is never used and can be removed. The same is true of the `ExecutorServiceRule`.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/UnsubscribeExecutor.java
##########
@@ -15,60 +15,56 @@
 
 package org.apache.geode.redis.internal.executor.pubsub;
 
-import static org.apache.geode.redis.internal.pubsub.Subscription.Type.CHANNEL;
+import static java.util.Collections.singletonList;
+import static org.apache.geode.redis.internal.netty.StringBytesGlossary.bUNSUBSCRIBE;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Client;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
 
 public class UnsubscribeExecutor extends AbstractExecutor {
 
   @Override
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
-
     List<byte[]> channelNames = extractChannelNames(command);
-    if (channelNames.isEmpty()) {
-      channelNames = context.getPubSub().findSubscriptionNames(context.getClient(), CHANNEL);
-    }
-
     Collection<Collection<?>> response = unsubscribe(context, channelNames);
-
     return RedisResponse.flattenedArray(response);
   }
 
   private List<byte[]> extractChannelNames(Command command) {

Review comment:
       This method can probably be inlined. Also, it's possible that we don't need to allocate a new list for this, and can instead add some logic in the `unsubscribe()` method to just not treat the first element in the list returned from `command.getProcessedCommand()` as a channel name. Given that unsubscribe is unlikely to be a command that's called on a hot path, this might not be worth the effort though, as performance isn't so critical.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/AbstractSubscriptionManager.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+package org.apache.geode.redis.internal.pubsub;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.geode.redis.internal.executor.GlobPattern;
+import org.apache.geode.redis.internal.netty.Client;
+
+abstract class AbstractSubscriptionManager<S extends Subscription>
+    implements SubscriptionManager<S> {
+  protected final Map<SubscriptionId, ClientSubscriptionManager<S>> map =
+      new ConcurrentHashMap<>();
+
+  protected ClientSubscriptionManager<S> getClientManager(byte[] channelOrPattern) {
+    SubscriptionId subscriptionId = new SubscriptionId(channelOrPattern);
+    return map.getOrDefault(subscriptionId, emptyClientManager());
+  }
+
+  @Override
+  public List<byte[]> getIds() {
+    final ArrayList<byte[]> result = new ArrayList<>(map.size());

Review comment:
       Could this be declared as `List<>` rather than the concrete class?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
##########
@@ -97,6 +97,7 @@ public PubSubImpl(Subscriptions subscriptions) {
     registerPublishFunction();

Review comment:
       I think it would be good to refactor the `registerPublishFunction()` method by pulling the anonymous `InternalFunction` that gets created there out into its own internal `RedisPubSubFunction` class which implements `DataSerializable` (or maybe `DataSerializableFixedID`, I don't know which is better here) rather than being Java serializable. At the very least, if Java serialization is used, a `serialVersionUID` should be added to prevent potential future issues with backwards compatibility.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
##########
@@ -96,104 +66,84 @@ boolean exists(Object channelOrPattern, Client client) {
    * @param pattern the glob pattern to search for
    */
   public List<byte[]> findChannelNames(byte[] pattern) {
-
-    GlobPattern globPattern = new GlobPattern(bytesToString(pattern));
-
-    return findChannelNames()
-        .stream()
-        .filter(name -> globPattern.matches(bytesToString(name)))
-        .collect(Collectors.toList());
+    return channelSubscriptions.getIds(pattern);
   }
 
   /**
-   * Return a list consisting of pairs {@code channelName, subscriptionCount}.
-   *
-   * @param names a list of the names to consider. This should not include any patterns.
+   * Return a count of all pattern subscriptions including duplicates.
    */
-  public List<Object> findNumberOfSubscribersPerChannel(List<byte[]> names) {
-    List<Object> result = new ArrayList<>();
-
-    names.forEach(name -> {
-      Long subscriptionCount = findSubscriptions(name)
-          .stream()
-          .filter(subscription -> subscription instanceof ChannelSubscription)
-          .count();
-
-      result.add(name);
-      result.add(subscriptionCount);
-    });
+  public int getPatternSubscriptionCount() {

Review comment:
       This method can be package protected instead of public, similar to the `getChannelSubscriptionCount()` method.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
##########
@@ -96,104 +66,84 @@ boolean exists(Object channelOrPattern, Client client) {
    * @param pattern the glob pattern to search for
    */
   public List<byte[]> findChannelNames(byte[] pattern) {
-
-    GlobPattern globPattern = new GlobPattern(bytesToString(pattern));
-
-    return findChannelNames()
-        .stream()
-        .filter(name -> globPattern.matches(bytesToString(name)))
-        .collect(Collectors.toList());
+    return channelSubscriptions.getIds(pattern);
   }
 
   /**
-   * Return a list consisting of pairs {@code channelName, subscriptionCount}.
-   *
-   * @param names a list of the names to consider. This should not include any patterns.
+   * Return a count of all pattern subscriptions including duplicates.
    */
-  public List<Object> findNumberOfSubscribersPerChannel(List<byte[]> names) {
-    List<Object> result = new ArrayList<>();
-
-    names.forEach(name -> {
-      Long subscriptionCount = findSubscriptions(name)
-          .stream()
-          .filter(subscription -> subscription instanceof ChannelSubscription)
-          .count();
-
-      result.add(name);
-      result.add(subscriptionCount);
-    });
+  public int getPatternSubscriptionCount() {
+    return patternSubscriptions.getSubscriptionCount();
+  }
 
-    return result;
+  @VisibleForTesting
+  int getChannelSubscriptionCount() {
+    return channelSubscriptions.getSubscriptionCount();
   }
 
-  /**
-   * Return a count of all pattern subscriptions including duplicates.
-   */
-  public long findNumberOfPatternSubscriptions() {
-    return subscriptions.stream()
-        .filter(subscription -> subscription instanceof PatternSubscription)
-        .count();
+  void add(ChannelSubscription subscription) {
+    channelSubscriptions.add(subscription);
   }
 
-  /**
-   * Add a new subscription
-   */
-  @VisibleForTesting
-  void add(Subscription subscription) {
-    subscriptions.add(subscription);
+  void add(PatternSubscription subscription) {
+    patternSubscriptions.add(subscription);
   }
 
   /**
    * Remove all subscriptions for a given client
    */
   public void remove(Client client) {
-    subscriptions.removeIf(subscription -> subscription.matchesClient(client));
-  }
-
-  /**
-   * Remove a single subscription
-   */
-  @VisibleForTesting
-  boolean remove(Object channel, Client client) {
-    return subscriptions.removeIf(subscription -> subscription.isEqualTo(channel, client));
+    channelSubscriptions.remove(client);
+    patternSubscriptions.remove(client);
+    client.clearSubscriptions();
   }
 
   /**
    * @return the total number of all local subscriptions
    */
   @VisibleForTesting
   int size() {
-    return subscriptions.size();
+    // this is only used by tests so performance is not an issue
+    return getChannelSubscriptionCount() + getPatternSubscriptionCount();
   }
 
-  public synchronized SubscribeResult subscribe(byte[] channel, ExecutionHandlerContext context,
-      Client client) {
-    Subscription createdSubscription = null;
-    if (!exists(channel, client)) {
-      createdSubscription = new ChannelSubscription(client, channel, context, this);
+  public SubscribeResult subscribe(byte[] channel, ExecutionHandlerContext context) {
+    final Client client = context.getClient();
+    ChannelSubscription createdSubscription = null;
+    if (client.addChannelSubscription(channel)) {
+      createdSubscription = new ChannelSubscription(channel, context, this);
       add(createdSubscription);
     }
-    long channelCount = findSubscriptions(client).size();
+    long channelCount = client.getSubscriptionCount();
     return new SubscribeResult(createdSubscription, channelCount, channel);
   }
 
-  public SubscribeResult psubscribe(byte[] patternBytes, ExecutionHandlerContext context,
-      Client client) {
-    GlobPattern pattern = new GlobPattern(bytesToString(patternBytes));
-    Subscription createdSubscription = null;
-    synchronized (this) {
-      if (!exists(pattern, client)) {
-        createdSubscription = new PatternSubscription(client, pattern, context, this);
+  public SubscribeResult psubscribe(byte[] patternBytes, ExecutionHandlerContext context) {
+    final Client client = context.getClient();
+    PatternSubscription createdSubscription = null;
+    if (client.addPatternSubscription(patternBytes)) {
+      boolean added = false;
+      try {
+        createdSubscription = new PatternSubscription(patternBytes, context, this);
         add(createdSubscription);
+        added = true;
+      } finally {
+        if (!added) {
+          // Must have had a problem parsing the pattern

Review comment:
       Is anything returned to the user indicating something went wrong if we hit a problem parsing the pattern here? It seems like any exception thrown due to parsing here would end up bubbling up to `ExecutionHandlerContext` where it'll result in a generic response of `SERVER_ERROR_MESSAGE`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org