You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mc...@apache.org on 2019/04/30 17:22:57 UTC
[geode] branch develop updated: GEODE-6708: Ensuring single drainer
and preventing NPE
This is an automated email from the ASF dual-hosted git repository.
mcmellawatt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new bc2a2fa GEODE-6708: Ensuring single drainer and preventing NPE
bc2a2fa is described below
commit bc2a2fa5af374cfedfba4dc1abe6cbc2a7b719c8
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Thu Apr 25 17:19:31 2019 -0700
GEODE-6708: Ensuring single drainer and preventing NPE
---
.../cache/tier/sockets/CacheClientNotifier.java | 19 ++++-
.../ClientRegistrationEventQueueManager.java | 99 ++++++++++++++--------
.../ClientRegistrationEventQueueManagerTest.java | 59 ++++++++++++-
3 files changed, 136 insertions(+), 41 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index a7dc8a4..da141d2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -40,6 +40,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.logging.log4j.Logger;
@@ -171,15 +172,15 @@ public class CacheClientNotifier {
try {
if (isClientPermitted(clientRegistrationMetadata, clientProxyMembershipID)) {
registrationQueueManager.create(clientProxyMembershipID, new ConcurrentLinkedQueue<>(),
- new ReentrantReadWriteLock());
+ new ReentrantReadWriteLock(), new ReentrantLock());
try {
registerClientInternal(clientRegistrationMetadata, socket, isPrimary, acceptorId,
notifyBySubscription);
} finally {
- registrationQueueManager.drain(
- clientProxyMembershipID,
- this);
+ if (isProxyInitialized(clientProxyMembershipID)) {
+ registrationQueueManager.drain(clientProxyMembershipID, this);
+ }
}
}
} catch (final AuthenticationRequiredException ex) {
@@ -1220,6 +1221,16 @@ public class CacheClientNotifier {
}
/**
+ * Determines whether a client proxy has been initialized
+ *
+ * @param clientProxyMembershipID The client proxy membership ID
+ * @return Whether the client proxy is initialized
+ */
+ private boolean isProxyInitialized(final ClientProxyMembershipID clientProxyMembershipID) {
+ return getClientProxy(clientProxyMembershipID) != null;
+ }
+
+ /**
* Returns the <code>CacheClientProxy</code> associated to the membershipID *
*
* @return the <code>CacheClientProxy</code> associated to the membershipID
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
index 122a192..23d8ef5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
@@ -20,6 +20,7 @@ import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.Logger;
@@ -95,39 +96,58 @@ class ClientRegistrationEventQueueManager {
void drain(final ClientProxyMembershipID clientProxyMembershipID,
final CacheClientNotifier cacheClientNotifier) {
- // As an optimization, we drain as many events from the queue as we can
- // before taking out a lock to drain the remaining events
- if (logger.isDebugEnabled()) {
- logger.debug("Draining events from registration queue for client proxy "
- + clientProxyMembershipID
- + " without synchronization");
- }
-
- drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID, cacheClientNotifier);
-
ClientRegistrationEventQueue registrationEventQueue =
registeringProxyEventQueues.get(clientProxyMembershipID);
- registrationEventQueue.lockForDraining();
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Draining remaining events from registration queue for client proxy "
- + clientProxyMembershipID + " with synchronization");
- }
+ if (registrationEventQueue != null) {
+ // It is possible that several client registration threads are active for the same
+ // ClientProxyMembershipID, in which case we only want a single drainer to drain
+ // and remove the queue.
+ registrationEventQueue.lockForSingleDrainer();
+ try {
+ // See if the queue is still available after acquiring the lock as it may have
+ // been removed from registeringProxyEventQueues by the previous thread
+ if (registeringProxyEventQueues.containsKey(clientProxyMembershipID)) {
+ // As an optimization, we drain as many events from the queue as we can
+ // before taking out a lock to drain the remaining events. When we lock for draining,
+ // it prevents additional events from being added to the queue while the queue is drained
+ // and removed.
+ if (logger.isDebugEnabled()) {
+ logger.debug("Draining events from registration queue for client proxy "
+ + clientProxyMembershipID
+ + " without synchronization");
+ }
+
+ drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID, registrationEventQueue,
+ cacheClientNotifier);
+
+ // Prevents additional events from being added to the queue while we process and remove it
+ registrationEventQueue.lockForDraining();
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Draining remaining events from registration queue for client proxy "
+ + clientProxyMembershipID + " with synchronization");
+ }
- drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID, cacheClientNotifier);
+ drainEventsReceivedWhileRegisteringClient(clientProxyMembershipID,
+ registrationEventQueue,
+ cacheClientNotifier);
- registeringProxyEventQueues.remove(clientProxyMembershipID);
- } finally {
- registrationEventQueue.unlockForDraining();
+ registeringProxyEventQueues.remove(clientProxyMembershipID);
+ } finally {
+ registrationEventQueue.unlockForDraining();
+ }
+ }
+ } finally {
+ registrationEventQueue.unlockForSingleDrainer();
+ }
}
}
private void drainEventsReceivedWhileRegisteringClient(final ClientProxyMembershipID proxyID,
+ final ClientRegistrationEventQueue registrationEventQueue,
final CacheClientNotifier cacheClientNotifier) {
ClientRegistrationEvent queuedEvent;
- ClientRegistrationEventQueue registrationEventQueue = registeringProxyEventQueues.get(proxyID);
-
while ((queuedEvent = registrationEventQueue.poll()) != null) {
InternalCacheEvent internalCacheEvent = queuedEvent.internalCacheEvent;
Conflatable conflatable = queuedEvent.conflatable;
@@ -139,23 +159,28 @@ class ClientRegistrationEventQueueManager {
public ClientRegistrationEventQueue create(
final ClientProxyMembershipID clientProxyMembershipID,
final Queue<ClientRegistrationEvent> eventQueue,
- final ReadWriteLock putDrainLock) {
+ final ReadWriteLock eventAddDrainLock,
+ final ReentrantLock singleDrainerLock) {
final ClientRegistrationEventQueue clientRegistrationEventQueue =
new ClientRegistrationEventQueue(eventQueue,
- putDrainLock);
- registeringProxyEventQueues.put(clientProxyMembershipID,
+ eventAddDrainLock, singleDrainerLock);
+ registeringProxyEventQueues.putIfAbsent(clientProxyMembershipID,
clientRegistrationEventQueue);
return clientRegistrationEventQueue;
}
class ClientRegistrationEventQueue {
- Queue<ClientRegistrationEvent> eventQueue;
- ReadWriteLock readWriteLock;
+ private final Queue<ClientRegistrationEvent> eventQueue;
+ private final ReadWriteLock eventAddDrainLock;
+ private final ReentrantLock singleDrainerLock;
ClientRegistrationEventQueue(
- final Queue<ClientRegistrationEvent> eventQueue, final ReadWriteLock readWriteLock) {
+ final Queue<ClientRegistrationEvent> eventQueue,
+ final ReadWriteLock eventAddDrainLock,
+ final ReentrantLock singleDrainerLock) {
this.eventQueue = eventQueue;
- this.readWriteLock = readWriteLock;
+ this.eventAddDrainLock = eventAddDrainLock;
+ this.singleDrainerLock = singleDrainerLock;
}
boolean isEmpty() {
@@ -171,19 +196,27 @@ class ClientRegistrationEventQueueManager {
}
private void lockForDraining() {
- readWriteLock.writeLock().lock();
+ eventAddDrainLock.writeLock().lock();
}
private void unlockForDraining() {
- readWriteLock.writeLock().unlock();
+ eventAddDrainLock.writeLock().unlock();
}
private void lockForPutting() {
- readWriteLock.readLock().lock();
+ eventAddDrainLock.readLock().lock();
}
private void unlockForPutting() {
- readWriteLock.readLock().unlock();
+ eventAddDrainLock.readLock().unlock();
+ }
+
+ private void lockForSingleDrainer() {
+ singleDrainerLock.lock();
+ }
+
+ private void unlockForSingleDrainer() {
+ singleDrainerLock.unlock();
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
index 801d178..f9af401 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
@@ -27,7 +27,9 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.junit.Test;
@@ -68,7 +70,7 @@ public class ClientRegistrationEventQueueManagerTest {
});
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(), mockPutDrainLock);
+ new ConcurrentLinkedQueue<>(), mockPutDrainLock, new ReentrantLock());
InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
LocalRegion localRegion = mock(LocalRegion.class);
@@ -125,7 +127,7 @@ public class ClientRegistrationEventQueueManagerTest {
ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
+ new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock(), new ReentrantLock());
InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
@@ -170,7 +172,7 @@ public class ClientRegistrationEventQueueManagerTest {
ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(), mockPutDrainLock);
+ new ConcurrentLinkedQueue<>(), mockPutDrainLock, new ReentrantLock());
InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
@@ -180,7 +182,7 @@ public class ClientRegistrationEventQueueManagerTest {
CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
CompletableFuture<Void> addEventsToQueueTask = CompletableFuture.runAsync(() -> {
- for (int i = 0; i < 100000; ++i) {
+ for (int numAdds = 0; numAdds < 100000; ++numAdds) {
// In thread one, we add events to the queue
clientRegistrationEventQueueManager
.add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier);
@@ -196,4 +198,53 @@ public class ClientRegistrationEventQueueManagerTest {
assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
}
+
+ @Test
+ public void twoThreadsRegisteringSameClientNoEventsLost()
+ throws ExecutionException, InterruptedException {
+ ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+ new ClientRegistrationEventQueueManager();
+
+ InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
+ when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
+
+ Conflatable conflatable = mock(Conflatable.class);
+ Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
+ CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+ ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
+
+ ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
+ clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+ new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock(), new ReentrantLock());
+
+ for (int registrationIterations = 0; registrationIterations < 1000; ++registrationIterations) {
+ Runnable clientRegistrationSimulation = () -> {
+ for (int numAdds = 0; numAdds < getRandomNumberOfAdds(); ++numAdds) {
+ // In thread one, we add events to the queue
+ clientRegistrationEventQueueManager
+ .add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier);
+ }
+ // In thread two, we drain events from the queue
+ clientRegistrationEventQueueManager.drain(clientProxyMembershipID, cacheClientNotifier);
+ };
+
+ CompletableFuture<Void> registrationFutureOne =
+ CompletableFuture.runAsync(clientRegistrationSimulation);
+ CompletableFuture<Void> registrationFutureTwo =
+ CompletableFuture.runAsync(clientRegistrationSimulation);
+
+ CompletableFuture.allOf(registrationFutureOne, registrationFutureTwo).get();
+
+ assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
+ }
+ }
+
+ /*
+ * This helps to create contention between registration threads during the drain phase
+ */
+ private static int getRandomNumberOfAdds() {
+ int min = 10_000;
+ int max = 50_000;
+ return ThreadLocalRandom.current().nextInt(min, max + 1);
+ }
}