You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mc...@apache.org on 2019/04/30 17:22:57 UTC

[geode] branch develop updated: GEODE-6708: Ensuring single drainer and preventing NPE

This is an automated email from the ASF dual-hosted git repository.

mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new bc2a2fa  GEODE-6708: Ensuring single drainer and preventing NPE
bc2a2fa is described below

commit bc2a2fa5af374cfedfba4dc1abe6cbc2a7b719c8
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Thu Apr 25 17:19:31 2019 -0700

    GEODE-6708: Ensuring single drainer and preventing NPE
---
 .../cache/tier/sockets/CacheClientNotifier.java    | 19 ++++-
 .../ClientRegistrationEventQueueManager.java       | 99 ++++++++++++++--------
 .../ClientRegistrationEventQueueManagerTest.java   | 59 ++++++++++++-
 3 files changed, 136 insertions(+), 41 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index a7dc8a4..da141d2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.logging.log4j.Logger;
@@ -171,15 +172,15 @@ public class CacheClientNotifier {
     try {
       if (isClientPermitted(clientRegistrationMetadata, clientProxyMembershipID)) {
         registrationQueueManager.create(clientProxyMembershipID, new ConcurrentLinkedQueue<>(),
-            new ReentrantReadWriteLock());
+            new ReentrantReadWriteLock(), new ReentrantLock());
 
         try {
           registerClientInternal(clientRegistrationMetadata, socket, isPrimary, acceptorId,
               notifyBySubscription);
         } finally {
-          registrationQueueManager.drain(
-              clientProxyMembershipID,
-              this);
+          if (isProxyInitialized(clientProxyMembershipID)) {
+            registrationQueueManager.drain(clientProxyMembershipID, this);
+          }
         }
       }
     } catch (final AuthenticationRequiredException ex) {
@@ -1220,6 +1221,16 @@ public class CacheClientNotifier {
   }
 
   /**
+   * Determines whether a client proxy has been initialized
+   *
+   * @param clientProxyMembershipID The client proxy membership ID
+   * @return Whether the client proxy is initialized
+   */
+  private boolean isProxyInitialized(final ClientProxyMembershipID clientProxyMembershipID) {
+    return getClientProxy(clientProxyMembershipID) != null;
+  }
+
+  /**
    * Returns the <code>CacheClientProxy</code> associated to the membershipID *
    *
    * @return the <code>CacheClientProxy</code> associated to the membershipID
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
index 122a192..23d8ef5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
@@ -20,6 +20,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.logging.log4j.Logger;
 
@@ -95,39 +96,58 @@ class ClientRegistrationEventQueueManager {
 
   void drain(final ClientProxyMembershipID clientProxyMembershipID,
       final CacheClientNotifier cacheClientNotifier) {
-    // As an optimization, we drain as many events from the queue as we can
-    // before taking out a lock to drain the remaining events
-    if (logger.isDebugEnabled()) {
-      logger.debug("Draining events from registration queue for client proxy "
-          + clientProxyMembershipID
-          + " without synchronization");
-    }
-
-    drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID, cacheClientNotifier);
-
     ClientRegistrationEventQueue registrationEventQueue =
         registeringProxyEventQueues.get(clientProxyMembershipID);
 
-    registrationEventQueue.lockForDraining();
-    try {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Draining remaining events from registration queue for client proxy "
-            + clientProxyMembershipID + " with synchronization");
-      }
+    if (registrationEventQueue != null) {
+      // It is possible that several client registration threads are active for the same
+      // ClientProxyMembershipID, in which case we only want a single drainer to drain
+      // and remove the queue.
+      registrationEventQueue.lockForSingleDrainer();
+      try {
+        // See if the queue is still available after acquiring the lock as it may have
+        // been removed from registeringProxyEventQueues by the previous thread
+        if (registeringProxyEventQueues.containsKey(clientProxyMembershipID)) {
+          // As an optimization, we drain as many events from the queue as we can
+          // before taking out a lock to drain the remaining events. When we lock for draining,
+          // it prevents additional events from being added to the queue while the queue is drained
+          // and removed.
+          if (logger.isDebugEnabled()) {
+            logger.debug("Draining events from registration queue for client proxy "
+                + clientProxyMembershipID
+                + " without synchronization");
+          }
+
+          drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID, registrationEventQueue,
+              cacheClientNotifier);
+
+          // Prevents additional events from being added to the queue while we process and remove it
+          registrationEventQueue.lockForDraining();
+          try {
+            if (logger.isDebugEnabled()) {
+              logger.debug("Draining remaining events from registration queue for client proxy "
+                  + clientProxyMembershipID + " with synchronization");
+            }
 
-      drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID, cacheClientNotifier);
+            drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID,
+                registrationEventQueue,
+                cacheClientNotifier);
 
-      registeringProxyEventQueues.remove(clientProxyMembershipID);
-    } finally {
-      registrationEventQueue.unlockForDraining();
+            registeringProxyEventQueues.remove(clientProxyMembershipID);
+          } finally {
+            registrationEventQueue.unlockForDraining();
+          }
+        }
+      } finally {
+        registrationEventQueue.unlockForSingleDrainer();
+      }
     }
   }
 
   private void drainEventsReceivedWhileRegisteringClient(final ClientProxyMembershipID proxyID,
+      final ClientRegistrationEventQueue registrationEventQueue,
       final CacheClientNotifier cacheClientNotifier) {
     ClientRegistrationEvent queuedEvent;
-    ClientRegistrationEventQueue registrationEventQueue = registeringProxyEventQueues.get(proxyID);
-
     while ((queuedEvent = registrationEventQueue.poll()) != null) {
       InternalCacheEvent internalCacheEvent = queuedEvent.internalCacheEvent;
       Conflatable conflatable = queuedEvent.conflatable;
@@ -139,23 +159,28 @@ class ClientRegistrationEventQueueManager {
   public ClientRegistrationEventQueue create(
       final ClientProxyMembershipID clientProxyMembershipID,
       final Queue<ClientRegistrationEvent> eventQueue,
-      final ReadWriteLock putDrainLock) {
+      final ReadWriteLock eventAddDrainLock,
+      final ReentrantLock singleDrainerLock) {
     final ClientRegistrationEventQueue clientRegistrationEventQueue =
         new ClientRegistrationEventQueue(eventQueue,
-            putDrainLock);
-    registeringProxyEventQueues.put(clientProxyMembershipID,
+            eventAddDrainLock, singleDrainerLock);
+    registeringProxyEventQueues.putIfAbsent(clientProxyMembershipID,
         clientRegistrationEventQueue);
     return clientRegistrationEventQueue;
   }
 
   class ClientRegistrationEventQueue {
-    Queue<ClientRegistrationEvent> eventQueue;
-    ReadWriteLock readWriteLock;
+    private final Queue<ClientRegistrationEvent> eventQueue;
+    private final ReadWriteLock eventAddDrainLock;
+    private final ReentrantLock singleDrainerLock;
 
     ClientRegistrationEventQueue(
-        final Queue<ClientRegistrationEvent> eventQueue, final ReadWriteLock readWriteLock) {
+        final Queue<ClientRegistrationEvent> eventQueue,
+        final ReadWriteLock eventAddDrainLock,
+        final ReentrantLock singleDrainerLock) {
       this.eventQueue = eventQueue;
-      this.readWriteLock = readWriteLock;
+      this.eventAddDrainLock = eventAddDrainLock;
+      this.singleDrainerLock = singleDrainerLock;
     }
 
     boolean isEmpty() {
@@ -171,19 +196,27 @@ class ClientRegistrationEventQueueManager {
     }
 
     private void lockForDraining() {
-      readWriteLock.writeLock().lock();
+      eventAddDrainLock.writeLock().lock();
     }
 
     private void unlockForDraining() {
-      readWriteLock.writeLock().unlock();
+      eventAddDrainLock.writeLock().unlock();
     }
 
     private void lockForPutting() {
-      readWriteLock.readLock().lock();
+      eventAddDrainLock.readLock().lock();
     }
 
     private void unlockForPutting() {
-      readWriteLock.readLock().unlock();
+      eventAddDrainLock.readLock().unlock();
+    }
+
+    private void lockForSingleDrainer() {
+      singleDrainerLock.lock();
+    }
+
+    private void unlockForSingleDrainer() {
+      singleDrainerLock.unlock();
     }
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
index 801d178..f9af401 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
@@ -27,7 +27,9 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.junit.Test;
@@ -68,7 +70,7 @@ public class ClientRegistrationEventQueueManagerTest {
         });
 
     clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-        new ConcurrentLinkedQueue<>(), mockPutDrainLock);
+        new ConcurrentLinkedQueue<>(), mockPutDrainLock, new ReentrantLock());
 
     InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
     LocalRegion localRegion = mock(LocalRegion.class);
@@ -125,7 +127,7 @@ public class ClientRegistrationEventQueueManagerTest {
     ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
 
     clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-        new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
+        new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock(), new ReentrantLock());
 
     InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
     when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
@@ -170,7 +172,7 @@ public class ClientRegistrationEventQueueManagerTest {
 
     ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
         clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-            new ConcurrentLinkedQueue<>(), mockPutDrainLock);
+            new ConcurrentLinkedQueue<>(), mockPutDrainLock, new ReentrantLock());
 
     InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
     when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
@@ -180,7 +182,7 @@ public class ClientRegistrationEventQueueManagerTest {
     CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
 
     CompletableFuture<Void> addEventsToQueueTask = CompletableFuture.runAsync(() -> {
-      for (int i = 0; i < 100000; ++i) {
+      for (int numAdds = 0; numAdds < 100000; ++numAdds) {
         // In thread one, we add events to the queue
         clientRegistrationEventQueueManager
             .add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier);
@@ -196,4 +198,53 @@ public class ClientRegistrationEventQueueManagerTest {
 
     assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
   }
+
+  @Test
+  public void twoThreadsRegisteringSameClientNoEventsLost()
+      throws ExecutionException, InterruptedException {
+    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+        new ClientRegistrationEventQueueManager();
+
+    InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
+    when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
+
+    Conflatable conflatable = mock(Conflatable.class);
+    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
+    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
+
+    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
+        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+            new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock(), new ReentrantLock());
+
+    for (int registrationIterations = 0; registrationIterations < 1000; ++registrationIterations) {
+      Runnable clientRegistrationSimulation = () -> {
+        for (int numAdds = 0; numAdds < getRandomNumberOfAdds(); ++numAdds) {
+          // In thread one, we add events to the queue
+          clientRegistrationEventQueueManager
+              .add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier);
+        }
+        // In thread two, we drain events from the queue
+        clientRegistrationEventQueueManager.drain(clientProxyMembershipID, cacheClientNotifier);
+      };
+
+      CompletableFuture<Void> registrationFutureOne =
+          CompletableFuture.runAsync(clientRegistrationSimulation);
+      CompletableFuture<Void> registrationFutureTwo =
+          CompletableFuture.runAsync(clientRegistrationSimulation);
+
+      CompletableFuture.allOf(registrationFutureOne, registrationFutureTwo).get();
+
+      assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
+    }
+  }
+
+  /*
+   * This helps to create contention between registration threads during the drain phase
+   */
+  private static int getRandomNumberOfAdds() {
+    int min = 10_000;
+    int max = 50_000;
+    return ThreadLocalRandom.current().nextInt(min, max + 1);
+  }
 }