You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2021/01/26 21:39:40 UTC

[geode] branch develop updated: GEODE-8811: Fix new value in events for failing client (#5953)

This is an automated email from the ASF dual-hosted git repository.

klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new f8987ed  GEODE-8811: Fix new value in events for failing client (#5953)
f8987ed is described below

commit f8987edd090558ba17f715e7e5fdb00e898e72eb
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Tue Jan 26 13:38:01 2021 -0800

    GEODE-8811: Fix new value in events for failing client (#5953)
    
    * Declare parameters as interfaces instead of implementations.
    * Improve unit testing of CacheClientNotifier.
    * Use MockitoRule with STRICT_STUBS.
---
 .../apache/geode/internal/cache/FilterProfile.java |  10 +
 .../geode/internal/cache/InternalRegion.java       |   3 +
 .../apache/geode/internal/cache/LocalRegion.java   |   1 +
 .../cache/tier/sockets/CacheClientNotifier.java    |  15 +-
 .../ClientRegistrationEventQueueManager.java       |  73 ++-
 .../tier/sockets/ClientUpdateMessageImpl.java      |  13 +-
 .../tier/sockets/CacheClientNotifierTest.java      | 510 +++++++++++----------
 .../ClientRegistrationEventQueueManagerTest.java   | 421 +++++++++--------
 .../java/org/apache/geode/test/fake/Fakes.java     |   2 +-
 9 files changed, 573 insertions(+), 475 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
index c34e755..5c95fff 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
@@ -2160,6 +2160,16 @@ public class FilterProfile implements DataSerializableFixedID {
         this.wireIDs.remove(mappedId);
       }
     }
+
+    @Override
+    public String toString() {
+      return "IDMap{" +
+          "nextID=" + nextID +
+          ", realIDs=" + realIDs +
+          ", wireIDs=" + wireIDs +
+          ", hasLongID=" + hasLongID +
+          '}';
+    }
   }
 
   /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index 876353f..432ecca 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -27,6 +27,7 @@ import org.apache.geode.Statistics;
 import org.apache.geode.cache.CacheWriter;
 import org.apache.geode.cache.CacheWriterException;
 import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.InterestRegistrationEvent;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionAttributes;
@@ -449,6 +450,8 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
 
   CachePerfStats getRegionPerfStats();
 
+  void handleInterestEvent(InterestRegistrationEvent event);
+
   VersionedObjectList basicRemoveAll(Collection<Object> keys,
       DistributedRemoveAllOperation removeAllOp, List<VersionTag> retryVersions);
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 96b2738..1833039 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -8619,6 +8619,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     cmnClearRegion(rEvent, false/* cacheWrite */, false/* useRVV */);
   }
 
+  @Override
   public void handleInterestEvent(InterestRegistrationEvent event) {
     throw new UnsupportedOperationException(
         "Region interest registration is only supported for PartitionedRegions");
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index aaa7dea..a3a67a3 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -89,7 +89,6 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalCacheEvent;
 import org.apache.geode.internal.cache.InternalRegion;
-import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.RegionEventImpl;
 import org.apache.geode.internal.cache.ha.HAContainerMap;
 import org.apache.geode.internal.cache.ha.HAContainerRegion;
@@ -691,15 +690,16 @@ public class CacheClientNotifier {
       HAEventWrapper wrapper = new HAEventWrapper(clientMessage);
       wrapper.incrementPutInProgressCounter("notify clients");
       conflatable = wrapper;
-    }
-    if (!filterClients.isEmpty()) {
-      if (event.getOperation().isEntry()) {
+
+      // include new value in event if the entry is not a tombstone and there are clients
+      if (!filterClients.isEmpty() && event.getOperation().isEntry()) {
         EntryEventImpl entryEvent = (EntryEventImpl) event;
         entryEvent.exportNewValue(clientMessage);
       }
     }
 
-    clientRegistrationEventQueueManager.add(event, conflatable, filterClients, this);
+    // add event to temporary queue for clients in process of registering (if any)
+    clientRegistrationEventQueueManager.add(event, clientMessage, conflatable, filterClients, this);
 
     singletonRouteClientMessage(conflatable, filterClients);
 
@@ -1029,7 +1029,8 @@ public class CacheClientNotifier {
 
     // NOTE: If delta is non-null, value MUST be in Object form of type Delta.
     ClientUpdateMessageImpl clientUpdateMsg =
-        new ClientUpdateMessageImpl(operation, (LocalRegion) event.getRegion(), keyOfInterest, null,
+        new ClientUpdateMessageImpl(operation, (InternalRegion) event.getRegion(), keyOfInterest,
+            null,
             delta, (byte) 0x01, callbackArgument, membershipID, eventIdentifier, versionTag);
 
     if (isNetLoad) {
@@ -1692,7 +1693,7 @@ public class CacheClientNotifier {
   }
 
   protected void handleInterestEvent(InterestRegistrationEvent event) {
-    LocalRegion region = (LocalRegion) event.getRegion();
+    InternalRegion region = (InternalRegion) event.getRegion();
     region.handleInterestEvent(event);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
index e106462..48b6d1d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
@@ -12,9 +12,9 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache.tier.sockets;
 
+import java.util.Collection;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -22,12 +22,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.CacheEvent;
 import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.FilterProfile;
 import org.apache.geode.internal.cache.FilterRoutingInfo;
 import org.apache.geode.internal.cache.InternalCacheEvent;
-import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
 /**
@@ -37,15 +39,25 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
  */
 public class ClientRegistrationEventQueueManager {
   private static final Logger logger = LogService.getLogger();
+
   private final Set<ClientRegistrationEventQueue> registeringProxyEventQueues =
       ConcurrentHashMap.newKeySet();
 
   void add(final InternalCacheEvent event,
+      final ClientUpdateMessageImpl clientMessage,
       final Conflatable conflatable,
       final Set<ClientProxyMembershipID> originalFilterClientIDs,
       final CacheClientNotifier cacheClientNotifier) {
-    if (registeringProxyEventQueues.isEmpty())
+    if (registeringProxyEventQueues.isEmpty()) {
       return;
+    }
+
+    if (originalFilterClientIDs.isEmpty()
+        && event.getOperation().isEntry()
+        && !(clientMessage instanceof ClientTombstoneMessage)) {
+      EntryEventImpl entryEvent = (EntryEventImpl) event;
+      entryEvent.exportNewValue(clientMessage);
+    }
 
     ClientRegistrationEvent clientRegistrationEvent =
         new ClientRegistrationEvent(event, conflatable);
@@ -109,12 +121,10 @@ public class ClientRegistrationEventQueueManager {
             + " without synchronization");
       }
 
-      CacheClientProxy cacheClientProxy = cacheClientNotifier
-          .getClientProxy(clientProxyMembershipID);
+      CacheClientProxy cacheClientProxy =
+          cacheClientNotifier.getClientProxy(clientProxyMembershipID);
 
-      drainEventsReceivedWhileRegisteringClient(
-          cacheClientProxy,
-          clientRegistrationEventQueue,
+      drainEventsReceivedWhileRegisteringClient(cacheClientProxy, clientRegistrationEventQueue,
           cacheClientNotifier);
 
       // Prevents additional events from being added to the queue while we process and remove it
@@ -126,9 +136,7 @@ public class ClientRegistrationEventQueueManager {
             + " with synchronization");
       }
 
-      drainEventsReceivedWhileRegisteringClient(
-          cacheClientProxy,
-          clientRegistrationEventQueue,
+      drainEventsReceivedWhileRegisteringClient(cacheClientProxy, clientRegistrationEventQueue,
           cacheClientNotifier);
     } finally {
       // The queue must be removed before attempting to release the drain lock
@@ -169,7 +177,7 @@ public class ClientRegistrationEventQueueManager {
         // and local filter info in order to do so. If any of these are null, then there is
         // no need to proceed as the client is not interested.
         FilterProfile filterProfile =
-            ((LocalRegion) internalCacheEvent.getRegion()).getFilterProfile();
+            ((InternalRegion) internalCacheEvent.getRegion()).getFilterProfile();
 
         if (filterProfile != null) {
           FilterRoutingInfo filterRoutingInfo =
@@ -217,10 +225,11 @@ public class ClientRegistrationEventQueueManager {
    * is interested in the event so we should deliver it.
    */
   private boolean eventNotInOriginalFilterClientIDs(final ClientProxyMembershipID proxyID,
-      final Set<ClientProxyMembershipID> newFilterClientIDs,
-      final Set<ClientProxyMembershipID> originalFilterClientIDs) {
+      final Collection<ClientProxyMembershipID> newFilterClientIDs,
+      final Collection<ClientProxyMembershipID> originalFilterClientIDs) {
     return originalFilterClientIDs == null
-        || (!originalFilterClientIDs.contains(proxyID) && newFilterClientIDs.contains(proxyID));
+        || !originalFilterClientIDs.contains(proxyID)
+            && newFilterClientIDs.contains(proxyID);
   }
 
   /**
@@ -230,7 +239,7 @@ public class ClientRegistrationEventQueueManager {
    *
    * @param event The InternalCacheEvent whose value will be copied to the heap if need be
    */
-  private void copyOffHeapToHeapForRegistrationQueue(final InternalCacheEvent event) {
+  private void copyOffHeapToHeapForRegistrationQueue(final CacheEvent event) {
     if (event.getOperation().isEntry()) {
       EntryEventImpl entryEvent = ((EntryEventImpl) event);
       entryEvent.copyOffHeapToHeap();
@@ -245,8 +254,8 @@ public class ClientRegistrationEventQueueManager {
     while ((queuedEvent = registrationEventQueue.poll()) != null) {
       InternalCacheEvent internalCacheEvent = queuedEvent.internalCacheEvent;
       Conflatable conflatable = queuedEvent.conflatable;
-      processEventAndDeliverConflatable(cacheClientProxy,
-          cacheClientNotifier, internalCacheEvent, conflatable, null);
+      processEventAndDeliverConflatable(cacheClientProxy, cacheClientNotifier, internalCacheEvent,
+          conflatable, null);
     }
   }
 
@@ -258,23 +267,34 @@ public class ClientRegistrationEventQueueManager {
    * info and determine if the client which was registering does have a CQ that matches or
    * has registered interest in the key.
    */
-  private class ClientRegistrationEvent {
+  @VisibleForTesting
+  static class ClientRegistrationEvent {
+
     private final Conflatable conflatable;
     private final InternalCacheEvent internalCacheEvent;
 
-    ClientRegistrationEvent(final InternalCacheEvent internalCacheEvent,
+    private ClientRegistrationEvent(final InternalCacheEvent internalCacheEvent,
         final Conflatable conflatable) {
       this.conflatable = conflatable;
       this.internalCacheEvent = internalCacheEvent;
     }
+
+    @Override
+    public String toString() {
+      return "ClientRegistrationEvent{" +
+          "conflatable=" + conflatable +
+          ", internalCacheEvent=" + internalCacheEvent +
+          '}';
+    }
   }
 
-  class ClientRegistrationEventQueue {
+  static class ClientRegistrationEventQueue {
+
     private final ClientProxyMembershipID clientProxyMembershipID;
     private final Queue<ClientRegistrationEvent> eventQueue;
     private final ReentrantReadWriteLock eventAddDrainLock;
 
-    ClientRegistrationEventQueue(
+    private ClientRegistrationEventQueue(
         final ClientProxyMembershipID clientProxyMembershipID,
         final Queue<ClientRegistrationEvent> eventQueue,
         final ReentrantReadWriteLock eventAddDrainLock) {
@@ -283,6 +303,15 @@ public class ClientRegistrationEventQueueManager {
       this.eventAddDrainLock = eventAddDrainLock;
     }
 
+    @Override
+    public String toString() {
+      return "ClientRegistrationEventQueue{" +
+          "clientProxyMembershipID=" + clientProxyMembershipID +
+          ", eventQueue=" + eventQueue +
+          ", eventAddDrainLock=" + eventAddDrainLock +
+          '}';
+    }
+
     public ClientProxyMembershipID getClientProxyMembershipID() {
       return clientProxyMembershipID;
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index bfd6af4..aa60118 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -12,7 +12,6 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache.tier.sockets;
 
 import java.io.DataInput;
@@ -38,7 +37,7 @@ import org.apache.geode.internal.cache.CachedDeserializableFactory;
 import org.apache.geode.internal.cache.EntryEventImpl.NewValueImporter;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.WrappedCallbackArgument;
 import org.apache.geode.internal.cache.ha.HAContainerRegion;
 import org.apache.geode.internal.cache.tier.MessageType;
@@ -51,12 +50,10 @@ import org.apache.geode.internal.serialization.SerializationContext;
 import org.apache.geode.internal.size.Sizeable;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 
-
 /**
- * Class <code>ClientUpdateMessageImpl</code> is a message representing a cache operation that is
+ * Class {@code ClientUpdateMessageImpl} is a message representing a cache operation that is
  * sent from a server to an interested client.
  *
- *
  * @since GemFire 4.2
  */
 public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, NewValueImporter {
@@ -161,14 +158,14 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
    * @param callbackArgument The callback argument
    * @param memberId membership id of the originator of the event
    */
-  public ClientUpdateMessageImpl(EnumListenerEvent operation, LocalRegion region,
+  public ClientUpdateMessageImpl(EnumListenerEvent operation, InternalRegion region,
       Object keyOfInterest, Object value, byte valueIsObject, Object callbackArgument,
       ClientProxyMembershipID memberId, EventID eventIdentifier) {
     this(operation, region, keyOfInterest, value, null, valueIsObject, callbackArgument, memberId,
         eventIdentifier, null);
   }
 
-  public ClientUpdateMessageImpl(EnumListenerEvent operation, LocalRegion region,
+  public ClientUpdateMessageImpl(EnumListenerEvent operation, InternalRegion region,
       Object keyOfInterest, Object value, byte[] delta, byte valueIsObject, Object callbackArgument,
       ClientProxyMembershipID memberId, EventID eventIdentifier, VersionTag versionTag) {
     this._operation = operation;
@@ -179,7 +176,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
     this._callbackArgument = callbackArgument;
     this._membershipId = memberId;
     this._eventIdentifier = eventIdentifier;
-    this._shouldConflate = (isUpdate() && region.getEnableConflation());
+    this._shouldConflate = isUpdate() && region.getEnableConflation();
     this.deltaBytes = delta;
     this.versionTag = versionTag;
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
index 26291f9..b9bb56e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
@@ -14,333 +14,351 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.quality.Strictness.STRICT_STUBS;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.net.Socket;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.junit.After;
 import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
 
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.Statistics;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.query.internal.cq.ServerCQ;
-import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.FilterProfile;
 import org.apache.geode.internal.cache.FilterRoutingInfo;
+import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalCacheEvent;
-import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.InternalRegion;
 import org.apache.geode.internal.cache.RegionQueueException;
+import org.apache.geode.internal.cache.tier.sockets.ClientRegistrationEventQueueManager.ClientRegistrationEventQueue;
 import org.apache.geode.internal.statistics.StatisticsClock;
-import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.internal.statistics.StatisticsManager;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 public class CacheClientNotifierTest {
-  @Before
-  public void setup() {
+
+  private static final String CQ_NAME = "testCQ";
+  private static final long CQ_ID = 0;
+
+  private final AtomicReference<CountDownLatch> afterLatch =
+      new AtomicReference<>(new CountDownLatch(0));
+  private final AtomicReference<CountDownLatch> beforeLatch =
+      new AtomicReference<>(new CountDownLatch(0));
+
+  private CacheClientProxy cacheClientProxy;
+  private ClientProxyMembershipID clientProxyMembershipId;
+  private ClientRegistrationEventQueueManager clientRegistrationEventQueueManager;
+  private ClientRegistrationMetadata clientRegistrationMetadata;
+  private InternalCache internalCache;
+  private InternalDistributedSystem internalDistributedSystem;
+  private Socket socket;
+  private Statistics statistics;
+  private StatisticsManager statisticsManager;
+  private InternalRegion region;
+
+  private CacheClientNotifier cacheClientNotifier;
+
+  @Rule
+  public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
+  @Rule
+  public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+  @BeforeClass
+  public static void clearStatics() {
     // Perform cleanup on any singletons received from previous test runs, since the
     // CacheClientNotifier is a static and previous tests may not have cleaned up properly.
-    shutdownExistingCacheClientNotifier();
+    CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance();
+    if (cacheClientNotifier != null) {
+      cacheClientNotifier.shutdown(0);
+    }
+  }
+
+  @Before
+  public void setUp() {
+    cacheClientProxy = mock(CacheClientProxy.class);
+    clientProxyMembershipId = mock(ClientProxyMembershipID.class);
+    clientRegistrationEventQueueManager = mock(ClientRegistrationEventQueueManager.class);
+    clientRegistrationMetadata = mock(ClientRegistrationMetadata.class);
+    internalCache = mock(InternalCache.class);
+    internalDistributedSystem = mock(InternalDistributedSystem.class);
+    region = mock(InternalRegion.class);
+    socket = mock(Socket.class);
+    statistics = mock(Statistics.class);
+    statisticsManager = mock(StatisticsManager.class);
   }
 
   @After
   public void tearDown() {
-    shutdownExistingCacheClientNotifier();
+    beforeLatch.get().countDown();
+    afterLatch.get().countDown();
+
+    clearStatics();
   }
 
   @Test
   public void eventsInClientRegistrationQueueAreSentToClientAfterRegistrationIsComplete()
-      throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
-      InvocationTargetException, InterruptedException, ExecutionException {
-    InternalCache internalCache = Fakes.cache();
-    CacheServerStats cacheServerStats = mock(CacheServerStats.class);
-    Socket socket = mock(Socket.class);
-    ConnectionListener connectionListener = mock(ConnectionListener.class);
-    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-    CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
-    ClientUpdateMessageImpl clientUpdateMessage = mock(ClientUpdateMessageImpl.class);
-    ClientRegistrationMetadata clientRegistrationMetadata = mock(ClientRegistrationMetadata.class);
-    StatisticsClock statisticsClock = mock(StatisticsClock.class);
-
-    when(clientRegistrationMetadata.getClientProxyMembershipID()).thenReturn(
-        clientProxyMembershipID);
-
-    CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
-        new ClientRegistrationEventQueueManager(), statisticsClock, cacheServerStats, 0, 0,
-        connectionListener, null, false);
-    final CacheClientNotifier cacheClientNotifierSpy = spy(cacheClientNotifier);
-
-    CountDownLatch waitForEventDispatchCountdownLatch = new CountDownLatch(1);
-    CountDownLatch waitForRegistrationCountdownLatch = new CountDownLatch(1);
-    ExecutorService registerAndNotifyExecutor = Executors.newFixedThreadPool(2);
-
-    try {
-      // We stub out the CacheClientNotifier.registerClientInternal() to do some "work" until
-      // a new event is received and queued, as triggered by the waitForEventDispatchCountdownLatch
-      doAnswer((i) -> {
-        when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
-        cacheClientNotifierSpy.addClientProxy(cacheClientProxy);
-        waitForRegistrationCountdownLatch.countDown();
-        waitForEventDispatchCountdownLatch.await();
-        return null;
-      }).when(cacheClientNotifierSpy).registerClientInternal(clientRegistrationMetadata, socket,
-          false, 0, true);
-      List<Callable<Void>> registerAndNotifyTasks = new ArrayList<>();
-
-      // In one thread, we register the new client which should create the temporary client
-      // registration event queue. Events will be passed to that queue while registration is
-      // underway. Once registration is complete, the queue is drained and the event is processed
-      // as normal.
-      registerAndNotifyTasks.add(() -> {
-        cacheClientNotifierSpy.registerClient(clientRegistrationMetadata, socket, false, 0, true);
-        return null;
-      });
-
-      // In a second thread, we mock the arrival of a new event. We want to ensure this event
-      // goes into the temporary client registration event queue. To do that, we wait on the
-      // waitForRegistrationCountdownLatch until registration is underway and the temp queue is
-      // created. Once it is, we process the event and notify clients, which should add the event
-      // to the temp queue. Finally, we resume registration and after it is complete, we verify
-      // that the event was drained, processed, and "delivered" (note message delivery is mocked
-      // and results in a no-op).
-      registerAndNotifyTasks.add(() -> {
-        try {
-          waitForRegistrationCountdownLatch.await();
-
-          InternalCacheEvent internalCacheEvent =
-              createMockInternalCacheEvent(clientProxyMembershipID, clientUpdateMessage,
-                  cacheClientNotifierSpy);
-
-          CacheClientNotifier.notifyClients(internalCacheEvent, clientUpdateMessage);
-        } finally {
-          // Ensure we always countdown so if the test fails it won't hang due to
-          // awaiting on this countdown latch.
-          waitForEventDispatchCountdownLatch.countDown();
-        }
-        return null;
-      });
-
-      final List<Future<Void>> futures =
-          registerAndNotifyExecutor.invokeAll(registerAndNotifyTasks);
-
-      for (final Future future : futures) {
-        future.get();
-      }
-    } finally {
-      // To prevent not cleaning up test resources in case an unexpected exception occurs
-      waitForEventDispatchCountdownLatch.countDown();
-      waitForRegistrationCountdownLatch.countDown();
-      registerAndNotifyExecutor.shutdownNow();
-      cacheClientNotifier.shutdown(0);
+      throws Exception {
+    // this test requires real impl instance of ClientRegistrationEventQueueManager
+    clientRegistrationEventQueueManager = new ClientRegistrationEventQueueManager();
+
+    when(cacheClientProxy.getProxyID())
+        .thenReturn(clientProxyMembershipId);
+    when(clientRegistrationMetadata.getClientProxyMembershipID())
+        .thenReturn(clientProxyMembershipId);
+    when(internalCache.getCancelCriterion())
+        .thenReturn(mock(CancelCriterion.class));
+    when(internalCache.getCCPTimer())
+        .thenReturn(mock(SystemTimer.class));
+    when(internalCache.getInternalDistributedSystem())
+        .thenReturn(internalDistributedSystem);
+    when(internalDistributedSystem.getStatisticsManager())
+        .thenReturn(statisticsManager);
+    when(statisticsManager.createAtomicStatistics(any(), any()))
+        .thenReturn(statistics);
+
+    cacheClientNotifier = spy(CacheClientNotifier.getInstance(internalCache,
+        clientRegistrationEventQueueManager, mock(StatisticsClock.class),
+        mock(CacheServerStats.class), 0, 0, mock(ConnectionListener.class), null, false));
+
+    beforeLatch.set(new CountDownLatch(1));
+    afterLatch.set(new CountDownLatch(1));
+
+    // We stub out the CacheClientNotifier.registerClientInternal() to do some "work" until
+    // a new event is received and queued, as triggered by the afterLatch
+    doAnswer(invocation -> {
+      cacheClientNotifier.addClientProxy(cacheClientProxy);
+      beforeLatch.get().countDown();
+      afterLatch.get().await();
+      return null;
+    })
+        .when(cacheClientNotifier)
+        .registerClientInternal(clientRegistrationMetadata, socket, false, 0, true);
+
+    Collection<Callable<Void>> tasks = new ArrayList<>();
+
+    // In one thread, we register the new client which should create the temporary client
+    // registration event queue. Events will be passed to that queue while registration is
+    // underway. Once registration is complete, the queue is drained and the event is processed
+    // as normal.
+    tasks.add(() -> {
+      cacheClientNotifier.registerClient(clientRegistrationMetadata, socket, false, 0, true);
+      return null;
+    });
+
+    // In a second thread, we mock the arrival of a new event. We want to ensure this event
+    // goes into the temporary client registration event queue. To do that, we wait on the
+    // beforeLatch until registration is underway and the temp queue is
+    // created. Once it is, we process the event and notify clients, which should add the event
+    // to the temp queue. Finally, we resume registration and after it is complete, we verify
+    // that the event was drained, processed, and "delivered" (note message delivery is mocked
+    // and results in a no-op).
+    tasks.add(() -> {
+      beforeLatch.get().await();
+
+      InternalCacheEvent internalCacheEvent = internalCacheEvent(clientProxyMembershipId);
+      ClientUpdateMessageImpl clientUpdateMessageImpl = mock(ClientUpdateMessageImpl.class);
+      CacheClientNotifier.notifyClients(internalCacheEvent, clientUpdateMessageImpl);
+
+      afterLatch.get().countDown();
+      return null;
+    });
+
+    for (Future future : executorServiceRule.getExecutorService().invokeAll(tasks)) {
+      future.get();
     }
 
-    verify(cacheClientProxy, times(1)).deliverMessage(isA(HAEventWrapper.class));
+    verify(cacheClientProxy).deliverMessage(any());
   }
 
   @Test
-  public void initializingMessageShouldntSerializeValuePrematurely() throws Exception {
-    InternalCache internalCache = Fakes.cache();
-    CacheServerStats cacheServerStats = mock(CacheServerStats.class);
-    ConnectionListener connectionListener = mock(ConnectionListener.class);
-    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-    ClientRegistrationMetadata clientRegistrationMetadata = mock(ClientRegistrationMetadata.class);
-    StatisticsClock statisticsClock = mock(StatisticsClock.class);
-
-    when(clientRegistrationMetadata.getClientProxyMembershipID()).thenReturn(
-        clientProxyMembershipID);
-
-    CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
-        new ClientRegistrationEventQueueManager(), statisticsClock, cacheServerStats, 0, 0,
-        connectionListener, null, false);
-    LocalRegion region = mock(LocalRegion.class);
-
-    EntryEventImpl entryEvent = mock(EntryEventImpl.class);
-    when(entryEvent.getEventType()).thenReturn(EnumListenerEvent.AFTER_CREATE);
-    when(entryEvent.getOperation()).thenReturn(Operation.CREATE);
-    when(entryEvent.getRegion()).thenReturn(region);
-    cacheClientNotifier.constructClientMessage(entryEvent);
-    verify(entryEvent, times(0)).exportNewValue(any());
+  public void initializingMessageDoesNotSerializeValuePrematurely() {
+    // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+    EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+
+    when(entryEventImpl.getEventType())
+        .thenReturn(EnumListenerEvent.AFTER_CREATE);
+    when(entryEventImpl.getOperation())
+        .thenReturn(Operation.CREATE);
+    when(entryEventImpl.getRegion())
+        .thenReturn(region);
+    when(internalCache.getCCPTimer())
+        .thenReturn(mock(SystemTimer.class));
+    when(internalCache.getInternalDistributedSystem())
+        .thenReturn(internalDistributedSystem);
+    when(internalDistributedSystem.getStatisticsManager())
+        .thenReturn(statisticsManager);
+    when(statisticsManager.createAtomicStatistics(any(), any()))
+        .thenReturn(statistics);
+
+    cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
+        mock(ClientRegistrationEventQueueManager.class), mock(StatisticsClock.class),
+        mock(CacheServerStats.class), 0, 0, mock(ConnectionListener.class), null, false);
+
+    cacheClientNotifier.constructClientMessage(entryEventImpl);
+
+    verify(entryEventImpl, never()).exportNewValue(any());
   }
 
   @Test
-  public void clientRegistrationFailsQueueStillDrained()
-      throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException,
-      IllegalAccessException, IOException {
-    InternalCache internalCache = Fakes.cache();
-    CacheServerStats cacheServerStats = mock(CacheServerStats.class);
-    Socket socket = mock(Socket.class);
-    ConnectionListener connectionListener = mock(ConnectionListener.class);
-    ClientRegistrationMetadata clientRegistrationMetadata = mock(ClientRegistrationMetadata.class);
-    StatisticsClock statisticsClock = mock(StatisticsClock.class);
-    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
-        mock(ClientRegistrationEventQueueManager.class);
-    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
-        mock(ClientRegistrationEventQueueManager.ClientRegistrationEventQueue.class);
-
-    when(clientRegistrationMetadata.getClientProxyMembershipID()).thenReturn(
-        clientProxyMembershipID);
-    when(clientRegistrationEventQueueManager.create(eq(clientProxyMembershipID), any(), any()))
-        .thenReturn(clientRegistrationEventQueue);
-
-    CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
-        clientRegistrationEventQueueManager, statisticsClock, cacheServerStats, 0, 0,
-        connectionListener, null, false);
-    CacheClientNotifier cacheClientNotifierSpy = spy(cacheClientNotifier);
+  public void clientRegistrationFailsQueueStillDrained() throws Exception {
+    ClientRegistrationEventQueue clientRegistrationEventQueue =
+        mock(ClientRegistrationEventQueue.class);
 
-    doAnswer((i) -> {
-      throw new RegionQueueException();
-    }).when(cacheClientNotifierSpy).registerClientInternal(clientRegistrationMetadata, socket,
-        false, 0, true);
-
-    assertThatThrownBy(() -> cacheClientNotifierSpy.registerClient(clientRegistrationMetadata,
-        socket, false, 0, true))
-            .isInstanceOf(IOException.class);
-
-    verify(clientRegistrationEventQueueManager, times(1)).create(
-        eq(clientProxyMembershipID), any(), any());
-
-    verify(clientRegistrationEventQueueManager, times(1)).drain(
-        eq(clientRegistrationEventQueue),
-        eq(cacheClientNotifierSpy));
-  }
-
-  private InternalCacheEvent createMockInternalCacheEvent(
-      final ClientProxyMembershipID clientProxyMembershipID,
-      final ClientUpdateMessageImpl clientUpdateMessage,
-      final CacheClientNotifier cacheClientNotifierSpy) {
-    InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
-
-    DistributedRegion region = mock(DistributedRegion.class);
-    when(internalCacheEvent.getRegion()).thenReturn(region);
-
-    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
-    final Long cqId = 0L;
-    final String cqName = "testCQ";
-
-    HashMap cqs = new HashMap<Long, Integer>() {
-      {
-        put(cqId, 123);
-      }
-    };
-    when(filterInfo.getCQs()).thenReturn(cqs);
-    when(internalCacheEvent.getLocalFilterInfo()).thenReturn(
-        filterInfo);
-    when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
-
-    FilterProfile filterProfile = mock(FilterProfile.class);
-    when(filterProfile.getRealCqID(cqId)).thenReturn(cqName);
-
-    ServerCQ serverCQ = mock(ServerCQ.class);
-    when(serverCQ.getClientProxyId()).thenReturn(clientProxyMembershipID);
-    when(filterProfile.getCq(cqName)).thenReturn(serverCQ);
-    when(region.getFilterProfile()).thenReturn(filterProfile);
-
-    FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
-    when(filterRoutingInfo.getLocalFilterInfo()).thenReturn(filterInfo);
-    when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent, true))
-        .thenReturn(filterRoutingInfo);
-    doReturn(clientUpdateMessage).when(cacheClientNotifierSpy)
-        .constructClientMessage(internalCacheEvent);
-
-    return internalCacheEvent;
+    when(clientRegistrationEventQueueManager.create(eq(clientProxyMembershipId), any(), any()))
+        .thenReturn(clientRegistrationEventQueue);
+    when(clientRegistrationMetadata.getClientProxyMembershipID())
+        .thenReturn(clientProxyMembershipId);
+    when(internalCache.getCCPTimer())
+        .thenReturn(mock(SystemTimer.class));
+    when(internalCache.getInternalDistributedSystem())
+        .thenReturn(internalDistributedSystem);
+    when(internalDistributedSystem.getStatisticsManager())
+        .thenReturn(statisticsManager);
+    when(statisticsManager.createAtomicStatistics(any(), any()))
+        .thenReturn(statistics);
+
+    cacheClientNotifier = spy(CacheClientNotifier.getInstance(internalCache,
+        clientRegistrationEventQueueManager, mock(StatisticsClock.class),
+        mock(CacheServerStats.class), 0, 0, mock(ConnectionListener.class), null, false));
+
+    doThrow(new RegionQueueException("thrown during client registration"))
+        .when(cacheClientNotifier)
+        .registerClientInternal(clientRegistrationMetadata, socket, false, 0, true);
+
+    Throwable thrown = catchThrowable(() -> {
+      cacheClientNotifier.registerClient(clientRegistrationMetadata, socket, false, 0, true);
+    });
+    assertThat(thrown).isInstanceOf(IOException.class);
+
+    verify(clientRegistrationEventQueueManager)
+        .create(eq(clientProxyMembershipId), any(), any());
+
+    verify(clientRegistrationEventQueueManager)
+        .drain(eq(clientRegistrationEventQueue), eq(cacheClientNotifier));
   }
 
   @Test
   public void testSingletonHasClientProxiesFalseNoCCN() {
-    assertFalse(CacheClientNotifier.singletonHasClientProxies());
+    assertThat(CacheClientNotifier.singletonHasClientProxies()).isFalse();
   }
 
   @Test
   public void testSingletonHasClientProxiesFalseNoProxy() {
-    InternalCache internalCache = Fakes.cache();
+    when(internalCache.getCCPTimer())
+        .thenReturn(mock(SystemTimer.class));
 
-    CacheClientNotifier ccn =
-        CacheClientNotifier.getInstance(internalCache,
-            mock(ClientRegistrationEventQueueManager.class),
-            mock(StatisticsClock.class),
-            mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), null, true);
-
-    assertFalse(CacheClientNotifier.singletonHasClientProxies());
-    ccn.shutdown(111);
+    cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
+        mock(ClientRegistrationEventQueueManager.class), mock(StatisticsClock.class),
+        mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), null, true);
 
+    assertThat(CacheClientNotifier.singletonHasClientProxies()).isFalse();
   }
 
   @Test
   public void testSingletonHasClientProxiesTrue() {
-    InternalCache internalCache = Fakes.cache();
-    CacheClientProxy proxy = mock(CacheClientProxy.class);
+    when(cacheClientProxy.getAcceptorId())
+        .thenReturn(111L);
+    when(cacheClientProxy.getProxyID())
+        .thenReturn(mock(ClientProxyMembershipID.class));
+    when(internalCache.getCCPTimer())
+        .thenReturn(mock(SystemTimer.class));
 
-    CacheClientNotifier ccn =
-        CacheClientNotifier.getInstance(internalCache,
-            mock(ClientRegistrationEventQueueManager.class),
-            mock(StatisticsClock.class),
-            mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), null, true);
+    cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
+        mock(ClientRegistrationEventQueueManager.class), mock(StatisticsClock.class),
+        mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), null, true);
 
-    when(proxy.getProxyID()).thenReturn(mock(ClientProxyMembershipID.class));
-    ccn.addClientProxy(proxy);
+    cacheClientNotifier.addClientProxy(cacheClientProxy);
 
     // check ClientProxy Map is not empty
-    assertTrue(CacheClientNotifier.singletonHasClientProxies());
-
-    when(proxy.getAcceptorId()).thenReturn(Long.valueOf(111));
-    ccn.shutdown(111);
+    assertThat(CacheClientNotifier.singletonHasClientProxies()).isTrue();
   }
 
   @Test
   public void testSingletonHasInitClientProxiesTrue() {
-    InternalCache internalCache = Fakes.cache();
-    CacheClientProxy proxy = mock(CacheClientProxy.class);
+    when(cacheClientProxy.getAcceptorId())
+        .thenReturn(111L);
+    when(cacheClientProxy.getProxyID())
+        .thenReturn(mock(ClientProxyMembershipID.class));
+    when(internalCache.getCCPTimer())
+        .thenReturn(mock(SystemTimer.class));
 
-    CacheClientNotifier ccn =
-        CacheClientNotifier.getInstance(internalCache,
-            mock(ClientRegistrationEventQueueManager.class),
-            mock(StatisticsClock.class),
-            mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), null, true);
+    cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
+        mock(ClientRegistrationEventQueueManager.class), mock(StatisticsClock.class),
+        mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), null, true);
 
-    when(proxy.getProxyID()).thenReturn(mock(ClientProxyMembershipID.class));
-    ccn.addClientInitProxy(proxy);
+    cacheClientNotifier.addClientInitProxy(cacheClientProxy);
 
     // check InitClientProxy Map is not empty
-    assertTrue(CacheClientNotifier.singletonHasClientProxies());
+    assertThat(CacheClientNotifier.singletonHasClientProxies()).isTrue();
 
-    ccn.addClientProxy(proxy);
+    cacheClientNotifier.addClientProxy(cacheClientProxy);
 
     // check ClientProxy Map is not empty
-    assertTrue(CacheClientNotifier.singletonHasClientProxies());
-
-    when(proxy.getAcceptorId()).thenReturn(Long.valueOf(111));
-    ccn.shutdown(111);
+    assertThat(CacheClientNotifier.singletonHasClientProxies()).isTrue();
   }
 
-  private void shutdownExistingCacheClientNotifier() {
-    CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance();
-    if (cacheClientNotifier != null) {
-      cacheClientNotifier.shutdown(0);
-    }
+  private InternalCacheEvent internalCacheEvent(ClientProxyMembershipID clientProxyMembershipID) {
+    FilterInfo filterInfo = mock(FilterInfo.class);
+    FilterProfile filterProfile = mock(FilterProfile.class);
+    FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
+    InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
+    ServerCQ serverCQ = mock(ServerCQ.class);
+
+    HashMap<Long, Integer> cqs = new HashMap<>();
+    cqs.put(CQ_ID, 123);
+
+    when(filterInfo.getCQs())
+        .thenReturn(cqs);
+    when(filterProfile.getCq(CQ_NAME))
+        .thenReturn(serverCQ);
+    when(filterProfile.getRealCqID(CQ_ID))
+        .thenReturn(CQ_NAME);
+    when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent, true))
+        .thenReturn(filterRoutingInfo);
+    when(filterRoutingInfo.getLocalFilterInfo())
+        .thenReturn(filterInfo);
+    when(internalCacheEvent.getRegion())
+        .thenReturn(region);
+    when(internalCacheEvent.getLocalFilterInfo())
+        .thenReturn(filterInfo);
+    when(internalCacheEvent.getOperation())
+        .thenReturn(mock(Operation.class));
+    when(region.getFilterProfile())
+        .thenReturn(filterProfile);
+    when(serverCQ.getClientProxyId())
+        .thenReturn(clientProxyMembershipID);
+
+    return internalCacheEvent;
   }
+
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
index 7ef75fe..150a763 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
@@ -12,122 +12,152 @@
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.geode.internal.cache.tier.sockets;
 
+import static java.util.Collections.emptySet;
+import static org.apache.geode.internal.util.CollectionUtils.asSet;
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.catchThrowable;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.quality.Strictness.STRICT_STUBS;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.stubbing.Answer;
 
 import org.apache.geode.cache.Operation;
 import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.FilterProfile;
 import org.apache.geode.internal.cache.FilterRoutingInfo;
+import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
 import org.apache.geode.internal.cache.InternalCacheEvent;
-import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.tier.sockets.ClientRegistrationEventQueueManager.ClientRegistrationEventQueue;
 
 public class ClientRegistrationEventQueueManagerTest {
-  @Test
-  public void messageDeliveredAfterRegisteringOnDrainIfNewFilterIDsIncludesClient() {
-    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
-        new ClientRegistrationEventQueueManager();
 
-    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-
-    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
-        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-            new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
+  private CacheClientNotifier cacheClientNotifier;
+  private CacheClientProxy cacheClientProxy;
+  private ClientProxyMembershipID clientProxyMembershipId;
+  private ClientUpdateMessageImpl clientUpdateMessage;
+  private FilterInfo filterInfo;
+  private FilterProfile filterProfile;
+  private FilterRoutingInfo filterRoutingInfo;
+  private InternalCacheEvent internalCacheEvent;
+  private InternalRegion internalRegion;
+  private Operation operation;
+
+  @Rule
+  public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
+
+  @Before
+  public void setUp() {
+    cacheClientNotifier = mock(CacheClientNotifier.class);
+    cacheClientProxy = mock(CacheClientProxy.class);
+    clientProxyMembershipId = mock(ClientProxyMembershipID.class);
+    clientUpdateMessage = mock(ClientUpdateMessageImpl.class);
+    filterInfo = mock(FilterInfo.class);
+    filterProfile = mock(FilterProfile.class);
+    filterRoutingInfo = mock(FilterRoutingInfo.class);
+    internalCacheEvent = mock(InternalCacheEvent.class);
+    internalRegion = mock(InternalRegion.class);
+    operation = mock(Operation.class);
+  }
 
-    InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
-    LocalRegion localRegion = mock(LocalRegion.class);
-    FilterProfile filterProfile = mock(FilterProfile.class);
-    FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
-    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+  @Test
+  public void messageDeliveredAfterRegisteringOnDrainIfNewFilterIDsIncludesClient() {
+    // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+    EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
 
-    when(filterRoutingInfo.getLocalFilterInfo()).thenReturn(
-        filterInfo);
-    when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent, true))
+    when(cacheClientNotifier.getClientProxy(clientProxyMembershipId))
+        .thenReturn(cacheClientProxy);
+    when(cacheClientNotifier.getFilterClientIDs(entryEventImpl, filterProfile, filterInfo,
+        clientUpdateMessage))
+            .thenReturn(asSet(clientProxyMembershipId));
+    when(cacheClientProxy.getProxyID())
+        .thenReturn(clientProxyMembershipId);
+    when(entryEventImpl.getOperation())
+        .thenReturn(operation);
+    when(entryEventImpl.getRegion())
+        .thenReturn(internalRegion);
+    when(filterProfile.getFilterRoutingInfoPart2(null, entryEventImpl, true))
         .thenReturn(filterRoutingInfo);
-    when(localRegion.getFilterProfile()).thenReturn(filterProfile);
-    when(internalCacheEvent.getRegion()).thenReturn(localRegion);
-    when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
+    when(filterRoutingInfo.getLocalFilterInfo())
+        .thenReturn(filterInfo);
+    when(internalRegion.getFilterProfile())
+        .thenReturn(filterProfile);
+    when(operation.isEntry())
+        .thenReturn(true);
 
-    ClientUpdateMessageImpl clientUpdateMessage = mock(ClientUpdateMessageImpl.class);
+    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+        new ClientRegistrationEventQueueManager();
 
-    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
-    Set<ClientProxyMembershipID> recalculatedFilterClientIDs = new HashSet<>();
-    recalculatedFilterClientIDs.add(clientProxyMembershipID);
-    when(cacheClientNotifier.getFilterClientIDs(internalCacheEvent, filterProfile, filterInfo,
-        clientUpdateMessage))
-            .thenReturn(recalculatedFilterClientIDs);
-    CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
-    when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
-    when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
+    ClientRegistrationEventQueue clientRegistrationEventQueue =
+        clientRegistrationEventQueueManager.create(clientProxyMembershipId,
+            new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
 
     // Create empty filter client IDs produced by the "normal" put processing path, so we can test
     // that the event is still delivered if the client finished registering and needs the event.
-    Set<ClientProxyMembershipID> normalPutFilterClientIDs = new HashSet<>();
 
-    clientRegistrationEventQueueManager
-        .add(internalCacheEvent, clientUpdateMessage, normalPutFilterClientIDs,
-            cacheClientNotifier);
+    clientRegistrationEventQueueManager.add(entryEventImpl, clientUpdateMessage,
+        clientUpdateMessage, emptySet(), cacheClientNotifier);
 
     clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
 
     // The client update message should still be delivered because it is now part of the
     // filter clients interested in this event, despite having not been included in the original
     // filter info in the "normal" put processing path.
-    verify(cacheClientProxy, times(1)).deliverMessage(clientUpdateMessage);
+    verify(cacheClientProxy).deliverMessage(clientUpdateMessage);
   }
 
   @Test
   public void clientRemovedFromFilterClientsListIfEventAddedToRegistrationQueue() {
+    // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+    EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+
+    when(entryEventImpl.getOperation())
+        .thenReturn(operation);
+    when(operation.isEntry())
+        .thenReturn(true);
+
     ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
         new ClientRegistrationEventQueueManager();
 
-    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-
-    clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+    clientRegistrationEventQueueManager.create(clientProxyMembershipId,
         new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
 
-    InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
-    when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
-    when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
-
-    Conflatable conflatable = mock(Conflatable.class);
-
     // Add the registering client to the filter clients. This can happen if the filter info is
     // received but the client is not completely registered yet (queue GII has not been completed).
     // In that case, we want to remove the client from the filter IDs set and add the event
     // to the client's registration queue.
-    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
-    filterClientIDs.add(clientProxyMembershipID);
-
-    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+    Set<ClientProxyMembershipID> filterClientIds = asSet(clientProxyMembershipId);
 
-    clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
-        cacheClientNotifier);
+    clientRegistrationEventQueueManager.add(entryEventImpl, mock(ClientUpdateMessageImpl.class),
+        mock(Conflatable.class), filterClientIds, mock(CacheClientNotifier.class));
 
     // The client should no longer be in the filter clients since the event was queued in the
     // client's registration queue.
-    assertThat(filterClientIDs.isEmpty()).isTrue();
+    assertThat(filterClientIds).isEmpty();
   }
 
   @Test
@@ -135,108 +165,96 @@ public class ClientRegistrationEventQueueManagerTest {
     ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
         new ClientRegistrationEventQueueManager();
 
-    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-
-    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
-        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-            new ConcurrentLinkedQueue<>(),
-            new ReentrantReadWriteLock());
+    ClientRegistrationEventQueue clientRegistrationEventQueue =
+        clientRegistrationEventQueueManager.create(mock(ClientProxyMembershipID.class),
+            new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
 
-    List<HAEventWrapper> haEventWrappers = new ArrayList<>();
-    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+    Collection<HAEventWrapper> haEventWrappers = new ArrayList<>();
 
     for (int i = 0; i < 5; ++i) {
+      EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+
+      when(entryEventImpl.getOperation())
+          .thenReturn(operation);
+      when(operation.isEntry())
+          .thenReturn(true);
+
       HAEventWrapper haEventWrapper = mock(HAEventWrapper.class);
       haEventWrappers.add(haEventWrapper);
-      InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
-      when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
-      when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
-      clientRegistrationEventQueueManager.add(internalCacheEvent,
-          haEventWrapper, new HashSet<>(), cacheClientNotifier);
-      verify(haEventWrapper, times(1)).incrementPutInProgressCounter(anyString());
+
+      clientRegistrationEventQueueManager.add(entryEventImpl,
+          mock(ClientUpdateMessageImpl.class), haEventWrapper, emptySet(), cacheClientNotifier);
+
+      verify(haEventWrapper).incrementPutInProgressCounter(anyString());
     }
 
     clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
 
     for (HAEventWrapper haEventWrapper : haEventWrappers) {
-      verify(haEventWrapper, times(1)).decrementPutInProgressCounter();
+      verify(haEventWrapper).decrementPutInProgressCounter();
     }
   }
 
   @Test
-  public void addAndDrainQueueContentionTest() throws ExecutionException, InterruptedException {
-    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-    ReentrantReadWriteLock mockPutDrainLock = mock(ReentrantReadWriteLock.class);
-    ReentrantReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
-
-    when(mockPutDrainLock.readLock())
-        .thenReturn(actualPutDrainLock.readLock());
+  public void addAndDrainQueueContentionTest() throws Exception {
+    ReentrantReadWriteLock readWriteLock = spy(new ReentrantReadWriteLock());
 
-    when(mockPutDrainLock.writeLock())
-        .thenAnswer(i -> {
+    when(readWriteLock.writeLock())
+        .thenAnswer((Answer<WriteLock>) invocation -> {
           // Force a context switch from drain to put thread so we can ensure the event is not lost
           Thread.sleep(1);
-          return actualPutDrainLock.writeLock();
+          return (WriteLock) invocation.callRealMethod();
         });
 
     ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
         new ClientRegistrationEventQueueManager();
 
-    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
-        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-            new ConcurrentLinkedQueue<>(), mockPutDrainLock);
-
-    InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
-    when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
-    when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
-
-    Conflatable conflatable = mock(Conflatable.class);
-    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
-    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
-    CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
-    when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
+    ClientRegistrationEventQueue clientRegistrationEventQueue =
+        clientRegistrationEventQueueManager.create(mock(ClientProxyMembershipID.class),
+            new ConcurrentLinkedQueue<>(), readWriteLock);
 
     CompletableFuture<Void> addEventsToQueueTask = CompletableFuture.runAsync(() -> {
-      for (int numAdds = 0; numAdds < 100000; ++numAdds) {
+      for (int count = 0; count < 1_000; ++count) { // was 100_000
         // In thread one, we add events to the queue
-        clientRegistrationEventQueueManager
-            .add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier);
+        clientRegistrationEventQueueManager.add(entryEventImpl(),
+            mock(ClientUpdateMessageImpl.class), mock(Conflatable.class), emptySet(),
+            cacheClientNotifier);
       }
     });
 
     CompletableFuture<Void> drainEventsFromQueueTask = CompletableFuture.runAsync(() -> {
       // In thread two, we drain events from the queue
-      clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
+      clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue,
+          cacheClientNotifier);
     });
 
-    CompletableFuture.allOf(addEventsToQueueTask, drainEventsFromQueueTask).get();
+    CompletableFuture
+        .allOf(addEventsToQueueTask, drainEventsFromQueueTask)
+        .get();
 
     assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
   }
 
   @Test
   public void addEventWithOffheapValueCopiedToHeap() {
-    EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
-    when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
-    Operation mockOperation = mock(Operation.class);
-    when(mockOperation.isEntry()).thenReturn(true);
-    when(internalCacheEvent.getOperation()).thenReturn(mockOperation);
+    // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+    EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
 
-    Conflatable conflatable = mock(Conflatable.class);
-    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
-    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
-    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
+    when(entryEventImpl.getOperation())
+        .thenReturn(operation);
+    when(operation.isEntry())
+        .thenReturn(true);
 
     ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
         new ClientRegistrationEventQueueManager();
 
-    clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+    clientRegistrationEventQueueManager.create(mock(ClientProxyMembershipID.class),
         new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
 
-    clientRegistrationEventQueueManager
-        .add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier);
+    clientRegistrationEventQueueManager.add(entryEventImpl, mock(ClientUpdateMessageImpl.class),
+        mock(Conflatable.class), emptySet(), mock(CacheClientNotifier.class));
 
-    verify(internalCacheEvent, times(1)).copyOffHeapToHeap();
+    verify(entryEventImpl).copyOffHeapToHeap();
   }
 
   @Test
@@ -244,24 +262,17 @@ public class ClientRegistrationEventQueueManagerTest {
     ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
         new ClientRegistrationEventQueueManager();
 
-    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
-    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-
-    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
-        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-            new ConcurrentLinkedQueue<>(),
-            new ReentrantReadWriteLock());
+    ClientRegistrationEventQueue clientRegistrationEventQueue =
+        clientRegistrationEventQueueManager.create(mock(ClientProxyMembershipID.class),
+            new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
 
     clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
 
-    EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
-    Conflatable conflatable = mock(Conflatable.class);
-    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
-
     // Pass a new event to the ClientRegistrationEventQueueManager. This event should not be added
     // to the test client's registration queue, because it should already be removed. We can
     // validate that by asserting that the client's registration queue is empty after the add.
-    clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
+    clientRegistrationEventQueueManager.add(mock(InternalCacheEvent.class),
+        mock(ClientUpdateMessageImpl.class), mock(Conflatable.class), emptySet(),
         cacheClientNotifier);
 
     assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
@@ -269,100 +280,128 @@ public class ClientRegistrationEventQueueManagerTest {
 
   @Test
   public void drainThrowsExceptionQueueStillRemoved() {
-    CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
-    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
-    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-    when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
+    // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+    EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+    RuntimeException thrownException = new RuntimeException("thrownException");
+
+    when(cacheClientNotifier.getClientProxy(clientProxyMembershipId))
+        .thenReturn(mock(CacheClientProxy.class));
+    when(entryEventImpl.getOperation())
+        .thenReturn(operation);
+    when(entryEventImpl.getRegion())
+        .thenThrow(thrownException);
+    when(operation.isEntry())
+        .thenReturn(true);
 
     ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
         new ClientRegistrationEventQueueManager();
 
-    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
-        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-            new ConcurrentLinkedQueue<>(),
-            new ReentrantReadWriteLock());
+    ClientRegistrationEventQueue clientRegistrationEventQueue =
+        clientRegistrationEventQueueManager.create(clientProxyMembershipId,
+            new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
 
-    Conflatable conflatable = mock(Conflatable.class);
-    Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
+    Set<ClientProxyMembershipID> filterClientIds = new HashSet<>();
 
-    EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
-    RuntimeException testException = new RuntimeException();
-    when(internalCacheEvent.getRegion()).thenThrow(testException);
-    Operation mockOperation = mock(Operation.class);
-    when(mockOperation.isEntry()).thenReturn(true);
-    when(internalCacheEvent.getOperation()).thenReturn(mockOperation);
+    clientRegistrationEventQueueManager.add(entryEventImpl, clientUpdateMessage,
+        mock(Conflatable.class), filterClientIds, cacheClientNotifier);
 
-    clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
-        cacheClientNotifier);
+    Throwable thrown = catchThrowable(() -> {
+      clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
+    });
 
-    assertThatThrownBy(() -> clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue,
-        cacheClientNotifier))
-            .isEqualTo(testException);
+    assertThat(thrown).isSameAs(thrownException);
 
     // Pass a new event to the ClientRegistrationEventQueueManager. This event should not be added
     // to the test client's registration queue, because it should already be removed. We can
     // validate that by asserting that the client's registration queue is empty after the add.
-    clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
-        cacheClientNotifier);
+    clientRegistrationEventQueueManager.add(entryEventImpl, clientUpdateMessage,
+        mock(Conflatable.class), filterClientIds, cacheClientNotifier);
 
-    assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
+    assertThat(clientRegistrationEventQueue.isEmpty())
+        .withFailMessage(clientRegistrationEventQueue + " should be empty.")
+        .isTrue();
   }
 
   @Test
   public void addEventInOriginalFilterIDsButQueueWasRemovedDueToSuccessfulRegistrationSoEventNotRedelivered() {
-    ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-    CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
-    CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
-    when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
-    Set<ClientProxyMembershipID> originalFilterIDs = new HashSet<>();
-    originalFilterIDs.add(clientProxyMembershipID);
-
-    ClientUpdateMessageImpl clientUpdateMessage = mock(ClientUpdateMessageImpl.class);
-
-    InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
-    LocalRegion localRegion = mock(LocalRegion.class);
-    FilterProfile filterProfile = mock(FilterProfile.class);
-    FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
-    FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
-
-    when(filterRoutingInfo.getLocalFilterInfo()).thenReturn(
-        filterInfo);
-    when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent))
-        .thenReturn(filterRoutingInfo);
-    when(localRegion.getFilterProfile()).thenReturn(filterProfile);
-    when(internalCacheEvent.getRegion()).thenReturn(localRegion);
-    when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
-
-    Set<ClientProxyMembershipID> recalculatedFilterClientIDs = new HashSet<>();
-    recalculatedFilterClientIDs.add(clientProxyMembershipID);
+    // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+    when(cacheClientNotifier.getClientProxy(clientProxyMembershipId))
+        .thenReturn(cacheClientProxy);
     when(cacheClientNotifier.getFilterClientIDs(internalCacheEvent, filterProfile, filterInfo,
         clientUpdateMessage))
-            .thenReturn(recalculatedFilterClientIDs);
-    when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
-    when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
-    ReentrantReadWriteLock mockReadWriteLock = mock(ReentrantReadWriteLock.class);
+            .thenReturn(asSet(clientProxyMembershipId));
+    when(cacheClientProxy.getProxyID())
+        .thenReturn(clientProxyMembershipId);
+    when(internalCacheEvent.getRegion())
+        .thenReturn(internalRegion);
+    when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent, true))
+        .thenReturn(filterRoutingInfo);
+    when(filterRoutingInfo.getLocalFilterInfo())
+        .thenReturn(filterInfo);
+    when(internalRegion.getFilterProfile())
+        .thenReturn(filterProfile);
+
+    ReentrantReadWriteLock readWriteLock = spy(new ReentrantReadWriteLock());
+    ReadLock readLock = spy(readWriteLock.readLock());
+
+    when(readWriteLock.readLock())
+        .thenReturn(readLock);
 
     ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
         new ClientRegistrationEventQueueManager();
 
-    ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
-        clientRegistrationEventQueueManager.create(clientProxyMembershipID,
-            new ConcurrentLinkedQueue<>(),
-            mockReadWriteLock);
+    ClientRegistrationEventQueue clientRegistrationEventQueue =
+        clientRegistrationEventQueueManager.create(clientProxyMembershipId,
+            new ConcurrentLinkedQueue<>(), readWriteLock);
 
-    ReentrantReadWriteLock.ReadLock mockReadLock = mock(ReentrantReadWriteLock.ReadLock.class);
-    when(mockReadWriteLock.readLock()).thenReturn(mockReadLock);
-    ReentrantReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
-    when(mockReadWriteLock.writeLock()).thenReturn(actualPutDrainLock.writeLock());
-    doAnswer(i -> {
+    doAnswer((Answer<Void>) invocation -> {
       clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
-      actualPutDrainLock.readLock();
+      invocation.callRealMethod();
       return null;
-    }).when(mockReadLock).lock();
+    })
+        .when(readLock)
+        .lock();
 
     clientRegistrationEventQueueManager.add(internalCacheEvent, clientUpdateMessage,
-        originalFilterIDs, cacheClientNotifier);
+        clientUpdateMessage, asSet(clientProxyMembershipId), cacheClientNotifier);
+
+    verify(cacheClientProxy, never()).deliverMessage(clientUpdateMessage);
+  }
+
+  @Test
+  public void addEventWithClientTombstoneDoesNotExportNewValue() {
+    ClientTombstoneMessage clientTombstoneMessage = mock(ClientTombstoneMessage.class);
+    // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+    EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+
+    ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+        new ClientRegistrationEventQueueManager();
+
+    clientRegistrationEventQueueManager.add(entryEventImpl, clientTombstoneMessage,
+        clientTombstoneMessage, asSet(mock(ClientProxyMembershipID.class)),
+        mock(CacheClientNotifier.class));
+
+    verify(entryEventImpl, never()).exportNewValue(clientTombstoneMessage);
+  }
+
+  private EntryEventImpl entryEventImpl() {
+    EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+    Operation operation = operation();
+
+    when(entryEventImpl.getOperation())
+        .thenReturn(operation);
+    when(entryEventImpl.getRegion())
+        .thenReturn(internalRegion);
+
+    return entryEventImpl;
+  }
+
+  private Operation operation() {
+    Operation operation = mock(Operation.class);
+
+    when(operation.isEntry())
+        .thenReturn(true);
 
-    verify(cacheClientProxy, times(0)).deliverMessage(clientUpdateMessage);
+    return operation;
   }
 }
diff --git a/geode-junit/src/main/java/org/apache/geode/test/fake/Fakes.java b/geode-junit/src/main/java/org/apache/geode/test/fake/Fakes.java
index 9cd71e6..1e663a8 100644
--- a/geode-junit/src/main/java/org/apache/geode/test/fake/Fakes.java
+++ b/geode-junit/src/main/java/org/apache/geode/test/fake/Fakes.java
@@ -64,7 +64,7 @@ import org.apache.geode.pdx.internal.TypeRegistry;
  * <pre>
  * cache = Fakes.cache(); Mockito.when(cache.getName()).thenReturn(...)
  *
- * <pre>
+ * </pre>
  *
  * Please help extend this class by adding other commonly used objects to this collection of fakes.
  */