You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2021/01/26 21:39:40 UTC
[geode] branch develop updated: GEODE-8811: Fix new value in events
for failing client (#5953)
This is an automated email from the ASF dual-hosted git repository.
klund pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new f8987ed GEODE-8811: Fix new value in events for failing client (#5953)
f8987ed is described below
commit f8987edd090558ba17f715e7e5fdb00e898e72eb
Author: Kirk Lund <kl...@apache.org>
AuthorDate: Tue Jan 26 13:38:01 2021 -0800
GEODE-8811: Fix new value in events for failing client (#5953)
* Declare parameters as interfaces instead of implementations.
* Improve unit testing of CacheClientNotifier.
* Use MockitoRule with STRICT_STUBS.
---
.../apache/geode/internal/cache/FilterProfile.java | 10 +
.../geode/internal/cache/InternalRegion.java | 3 +
.../apache/geode/internal/cache/LocalRegion.java | 1 +
.../cache/tier/sockets/CacheClientNotifier.java | 15 +-
.../ClientRegistrationEventQueueManager.java | 73 ++-
.../tier/sockets/ClientUpdateMessageImpl.java | 13 +-
.../tier/sockets/CacheClientNotifierTest.java | 510 +++++++++++----------
.../ClientRegistrationEventQueueManagerTest.java | 421 +++++++++--------
.../java/org/apache/geode/test/fake/Fakes.java | 2 +-
9 files changed, 573 insertions(+), 475 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
index c34e755..5c95fff 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterProfile.java
@@ -2160,6 +2160,16 @@ public class FilterProfile implements DataSerializableFixedID {
this.wireIDs.remove(mappedId);
}
}
+
+ @Override
+ public String toString() {
+ return "IDMap{" +
+ "nextID=" + nextID +
+ ", realIDs=" + realIDs +
+ ", wireIDs=" + wireIDs +
+ ", hasLongID=" + hasLongID +
+ '}';
+ }
}
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
index 876353f..432ecca 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalRegion.java
@@ -27,6 +27,7 @@ import org.apache.geode.Statistics;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.InterestRegistrationEvent;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
@@ -449,6 +450,8 @@ public interface InternalRegion extends Region, HasCachePerfStats, RegionEntryCo
CachePerfStats getRegionPerfStats();
+ void handleInterestEvent(InterestRegistrationEvent event);
+
VersionedObjectList basicRemoveAll(Collection<Object> keys,
DistributedRemoveAllOperation removeAllOp, List<VersionTag> retryVersions);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 96b2738..1833039 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -8619,6 +8619,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
cmnClearRegion(rEvent, false/* cacheWrite */, false/* useRVV */);
}
+ @Override
public void handleInterestEvent(InterestRegistrationEvent event) {
throw new UnsupportedOperationException(
"Region interest registration is only supported for PartitionedRegions");
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index aaa7dea..a3a67a3 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -89,7 +89,6 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalCacheEvent;
import org.apache.geode.internal.cache.InternalRegion;
-import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionEventImpl;
import org.apache.geode.internal.cache.ha.HAContainerMap;
import org.apache.geode.internal.cache.ha.HAContainerRegion;
@@ -691,15 +690,16 @@ public class CacheClientNotifier {
HAEventWrapper wrapper = new HAEventWrapper(clientMessage);
wrapper.incrementPutInProgressCounter("notify clients");
conflatable = wrapper;
- }
- if (!filterClients.isEmpty()) {
- if (event.getOperation().isEntry()) {
+
+ // include new value in event if the entry is not a tombstone and there are clients
+ if (!filterClients.isEmpty() && event.getOperation().isEntry()) {
EntryEventImpl entryEvent = (EntryEventImpl) event;
entryEvent.exportNewValue(clientMessage);
}
}
- clientRegistrationEventQueueManager.add(event, conflatable, filterClients, this);
+ // add event to temporary queue for clients in process of registering (if any)
+ clientRegistrationEventQueueManager.add(event, clientMessage, conflatable, filterClients, this);
singletonRouteClientMessage(conflatable, filterClients);
@@ -1029,7 +1029,8 @@ public class CacheClientNotifier {
// NOTE: If delta is non-null, value MUST be in Object form of type Delta.
ClientUpdateMessageImpl clientUpdateMsg =
- new ClientUpdateMessageImpl(operation, (LocalRegion) event.getRegion(), keyOfInterest, null,
+ new ClientUpdateMessageImpl(operation, (InternalRegion) event.getRegion(), keyOfInterest,
+ null,
delta, (byte) 0x01, callbackArgument, membershipID, eventIdentifier, versionTag);
if (isNetLoad) {
@@ -1692,7 +1693,7 @@ public class CacheClientNotifier {
}
protected void handleInterestEvent(InterestRegistrationEvent event) {
- LocalRegion region = (LocalRegion) event.getRegion();
+ InternalRegion region = (InternalRegion) event.getRegion();
region.handleInterestEvent(event);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
index e106462..48b6d1d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManager.java
@@ -12,9 +12,9 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache.tier.sockets;
+import java.util.Collection;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -22,12 +22,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.logging.log4j.Logger;
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.CacheEvent;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.FilterRoutingInfo;
import org.apache.geode.internal.cache.InternalCacheEvent;
-import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
@@ -37,15 +39,25 @@ import org.apache.geode.logging.internal.log4j.api.LogService;
*/
public class ClientRegistrationEventQueueManager {
private static final Logger logger = LogService.getLogger();
+
private final Set<ClientRegistrationEventQueue> registeringProxyEventQueues =
ConcurrentHashMap.newKeySet();
void add(final InternalCacheEvent event,
+ final ClientUpdateMessageImpl clientMessage,
final Conflatable conflatable,
final Set<ClientProxyMembershipID> originalFilterClientIDs,
final CacheClientNotifier cacheClientNotifier) {
- if (registeringProxyEventQueues.isEmpty())
+ if (registeringProxyEventQueues.isEmpty()) {
return;
+ }
+
+ if (originalFilterClientIDs.isEmpty()
+ && event.getOperation().isEntry()
+ && !(clientMessage instanceof ClientTombstoneMessage)) {
+ EntryEventImpl entryEvent = (EntryEventImpl) event;
+ entryEvent.exportNewValue(clientMessage);
+ }
ClientRegistrationEvent clientRegistrationEvent =
new ClientRegistrationEvent(event, conflatable);
@@ -109,12 +121,10 @@ public class ClientRegistrationEventQueueManager {
+ " without synchronization");
}
- CacheClientProxy cacheClientProxy = cacheClientNotifier
- .getClientProxy(clientProxyMembershipID);
+ CacheClientProxy cacheClientProxy =
+ cacheClientNotifier.getClientProxy(clientProxyMembershipID);
- drainEventsReceivedWhileRegisteringClient(
- cacheClientProxy,
- clientRegistrationEventQueue,
+ drainEventsReceivedWhileRegisteringClient(cacheClientProxy, clientRegistrationEventQueue,
cacheClientNotifier);
// Prevents additional events from being added to the queue while we process and remove it
@@ -126,9 +136,7 @@ public class ClientRegistrationEventQueueManager {
+ " with synchronization");
}
- drainEventsReceivedWhileRegisteringClient(
- cacheClientProxy,
- clientRegistrationEventQueue,
+ drainEventsReceivedWhileRegisteringClient(cacheClientProxy, clientRegistrationEventQueue,
cacheClientNotifier);
} finally {
// The queue must be removed before attempting to release the drain lock
@@ -169,7 +177,7 @@ public class ClientRegistrationEventQueueManager {
// and local filter info in order to do so. If any of these are null, then there is
// no need to proceed as the client is not interested.
FilterProfile filterProfile =
- ((LocalRegion) internalCacheEvent.getRegion()).getFilterProfile();
+ ((InternalRegion) internalCacheEvent.getRegion()).getFilterProfile();
if (filterProfile != null) {
FilterRoutingInfo filterRoutingInfo =
@@ -217,10 +225,11 @@ public class ClientRegistrationEventQueueManager {
* is interested in the event so we should deliver it.
*/
private boolean eventNotInOriginalFilterClientIDs(final ClientProxyMembershipID proxyID,
- final Set<ClientProxyMembershipID> newFilterClientIDs,
- final Set<ClientProxyMembershipID> originalFilterClientIDs) {
+ final Collection<ClientProxyMembershipID> newFilterClientIDs,
+ final Collection<ClientProxyMembershipID> originalFilterClientIDs) {
return originalFilterClientIDs == null
- || (!originalFilterClientIDs.contains(proxyID) && newFilterClientIDs.contains(proxyID));
+ || !originalFilterClientIDs.contains(proxyID)
+ && newFilterClientIDs.contains(proxyID);
}
/**
@@ -230,7 +239,7 @@ public class ClientRegistrationEventQueueManager {
*
* @param event The InternalCacheEvent whose value will be copied to the heap if need be
*/
- private void copyOffHeapToHeapForRegistrationQueue(final InternalCacheEvent event) {
+ private void copyOffHeapToHeapForRegistrationQueue(final CacheEvent event) {
if (event.getOperation().isEntry()) {
EntryEventImpl entryEvent = ((EntryEventImpl) event);
entryEvent.copyOffHeapToHeap();
@@ -245,8 +254,8 @@ public class ClientRegistrationEventQueueManager {
while ((queuedEvent = registrationEventQueue.poll()) != null) {
InternalCacheEvent internalCacheEvent = queuedEvent.internalCacheEvent;
Conflatable conflatable = queuedEvent.conflatable;
- processEventAndDeliverConflatable(cacheClientProxy,
- cacheClientNotifier, internalCacheEvent, conflatable, null);
+ processEventAndDeliverConflatable(cacheClientProxy, cacheClientNotifier, internalCacheEvent,
+ conflatable, null);
}
}
@@ -258,23 +267,34 @@ public class ClientRegistrationEventQueueManager {
* info and determine if the client which was registering does have a CQ that matches or
* has registered interest in the key.
*/
- private class ClientRegistrationEvent {
+ @VisibleForTesting
+ static class ClientRegistrationEvent {
+
private final Conflatable conflatable;
private final InternalCacheEvent internalCacheEvent;
- ClientRegistrationEvent(final InternalCacheEvent internalCacheEvent,
+ private ClientRegistrationEvent(final InternalCacheEvent internalCacheEvent,
final Conflatable conflatable) {
this.conflatable = conflatable;
this.internalCacheEvent = internalCacheEvent;
}
+
+ @Override
+ public String toString() {
+ return "ClientRegistrationEvent{" +
+ "conflatable=" + conflatable +
+ ", internalCacheEvent=" + internalCacheEvent +
+ '}';
+ }
}
- class ClientRegistrationEventQueue {
+ static class ClientRegistrationEventQueue {
+
private final ClientProxyMembershipID clientProxyMembershipID;
private final Queue<ClientRegistrationEvent> eventQueue;
private final ReentrantReadWriteLock eventAddDrainLock;
- ClientRegistrationEventQueue(
+ private ClientRegistrationEventQueue(
final ClientProxyMembershipID clientProxyMembershipID,
final Queue<ClientRegistrationEvent> eventQueue,
final ReentrantReadWriteLock eventAddDrainLock) {
@@ -283,6 +303,15 @@ public class ClientRegistrationEventQueueManager {
this.eventAddDrainLock = eventAddDrainLock;
}
+ @Override
+ public String toString() {
+ return "ClientRegistrationEventQueue{" +
+ "clientProxyMembershipID=" + clientProxyMembershipID +
+ ", eventQueue=" + eventQueue +
+ ", eventAddDrainLock=" + eventAddDrainLock +
+ '}';
+ }
+
public ClientProxyMembershipID getClientProxyMembershipID() {
return clientProxyMembershipID;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
index bfd6af4..aa60118 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientUpdateMessageImpl.java
@@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache.tier.sockets;
import java.io.DataInput;
@@ -38,7 +37,7 @@ import org.apache.geode.internal.cache.CachedDeserializableFactory;
import org.apache.geode.internal.cache.EntryEventImpl.NewValueImporter;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
-import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.WrappedCallbackArgument;
import org.apache.geode.internal.cache.ha.HAContainerRegion;
import org.apache.geode.internal.cache.tier.MessageType;
@@ -51,12 +50,10 @@ import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.size.Sizeable;
import org.apache.geode.logging.internal.log4j.api.LogService;
-
/**
- * Class <code>ClientUpdateMessageImpl</code> is a message representing a cache operation that is
+ * Class {@code ClientUpdateMessageImpl} is a message representing a cache operation that is
* sent from a server to an interested client.
*
- *
* @since GemFire 4.2
*/
public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, NewValueImporter {
@@ -161,14 +158,14 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
* @param callbackArgument The callback argument
* @param memberId membership id of the originator of the event
*/
- public ClientUpdateMessageImpl(EnumListenerEvent operation, LocalRegion region,
+ public ClientUpdateMessageImpl(EnumListenerEvent operation, InternalRegion region,
Object keyOfInterest, Object value, byte valueIsObject, Object callbackArgument,
ClientProxyMembershipID memberId, EventID eventIdentifier) {
this(operation, region, keyOfInterest, value, null, valueIsObject, callbackArgument, memberId,
eventIdentifier, null);
}
- public ClientUpdateMessageImpl(EnumListenerEvent operation, LocalRegion region,
+ public ClientUpdateMessageImpl(EnumListenerEvent operation, InternalRegion region,
Object keyOfInterest, Object value, byte[] delta, byte valueIsObject, Object callbackArgument,
ClientProxyMembershipID memberId, EventID eventIdentifier, VersionTag versionTag) {
this._operation = operation;
@@ -179,7 +176,7 @@ public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, N
this._callbackArgument = callbackArgument;
this._membershipId = memberId;
this._eventIdentifier = eventIdentifier;
- this._shouldConflate = (isUpdate() && region.getEnableConflation());
+ this._shouldConflate = isUpdate() && region.getEnableConflation();
this.deltaBytes = delta;
this.versionTag = versionTag;
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
index 26291f9..b9bb56e 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierTest.java
@@ -14,333 +14,351 @@
*/
package org.apache.geode.internal.cache.tier.sockets;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.quality.Strictness.STRICT_STUBS;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.net.Socket;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
-import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.Statistics;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.query.internal.cq.ServerCQ;
-import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.FilterRoutingInfo;
+import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalCacheEvent;
-import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.RegionQueueException;
+import org.apache.geode.internal.cache.tier.sockets.ClientRegistrationEventQueueManager.ClientRegistrationEventQueue;
import org.apache.geode.internal.statistics.StatisticsClock;
-import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.internal.statistics.StatisticsManager;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
public class CacheClientNotifierTest {
- @Before
- public void setup() {
+
+ private static final String CQ_NAME = "testCQ";
+ private static final long CQ_ID = 0;
+
+ private final AtomicReference<CountDownLatch> afterLatch =
+ new AtomicReference<>(new CountDownLatch(0));
+ private final AtomicReference<CountDownLatch> beforeLatch =
+ new AtomicReference<>(new CountDownLatch(0));
+
+ private CacheClientProxy cacheClientProxy;
+ private ClientProxyMembershipID clientProxyMembershipId;
+ private ClientRegistrationEventQueueManager clientRegistrationEventQueueManager;
+ private ClientRegistrationMetadata clientRegistrationMetadata;
+ private InternalCache internalCache;
+ private InternalDistributedSystem internalDistributedSystem;
+ private Socket socket;
+ private Statistics statistics;
+ private StatisticsManager statisticsManager;
+ private InternalRegion region;
+
+ private CacheClientNotifier cacheClientNotifier;
+
+ @Rule
+ public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+ @BeforeClass
+ public static void clearStatics() {
// Perform cleanup on any singletons received from previous test runs, since the
// CacheClientNotifier is a static and previous tests may not have cleaned up properly.
- shutdownExistingCacheClientNotifier();
+ CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance();
+ if (cacheClientNotifier != null) {
+ cacheClientNotifier.shutdown(0);
+ }
+ }
+
+ @Before
+ public void setUp() {
+ cacheClientProxy = mock(CacheClientProxy.class);
+ clientProxyMembershipId = mock(ClientProxyMembershipID.class);
+ clientRegistrationEventQueueManager = mock(ClientRegistrationEventQueueManager.class);
+ clientRegistrationMetadata = mock(ClientRegistrationMetadata.class);
+ internalCache = mock(InternalCache.class);
+ internalDistributedSystem = mock(InternalDistributedSystem.class);
+ region = mock(InternalRegion.class);
+ socket = mock(Socket.class);
+ statistics = mock(Statistics.class);
+ statisticsManager = mock(StatisticsManager.class);
}
@After
public void tearDown() {
- shutdownExistingCacheClientNotifier();
+ beforeLatch.get().countDown();
+ afterLatch.get().countDown();
+
+ clearStatics();
}
@Test
public void eventsInClientRegistrationQueueAreSentToClientAfterRegistrationIsComplete()
- throws IOException, ClassNotFoundException, NoSuchMethodException, IllegalAccessException,
- InvocationTargetException, InterruptedException, ExecutionException {
- InternalCache internalCache = Fakes.cache();
- CacheServerStats cacheServerStats = mock(CacheServerStats.class);
- Socket socket = mock(Socket.class);
- ConnectionListener connectionListener = mock(ConnectionListener.class);
- ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
- CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
- ClientUpdateMessageImpl clientUpdateMessage = mock(ClientUpdateMessageImpl.class);
- ClientRegistrationMetadata clientRegistrationMetadata = mock(ClientRegistrationMetadata.class);
- StatisticsClock statisticsClock = mock(StatisticsClock.class);
-
- when(clientRegistrationMetadata.getClientProxyMembershipID()).thenReturn(
- clientProxyMembershipID);
-
- CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
- new ClientRegistrationEventQueueManager(), statisticsClock, cacheServerStats, 0, 0,
- connectionListener, null, false);
- final CacheClientNotifier cacheClientNotifierSpy = spy(cacheClientNotifier);
-
- CountDownLatch waitForEventDispatchCountdownLatch = new CountDownLatch(1);
- CountDownLatch waitForRegistrationCountdownLatch = new CountDownLatch(1);
- ExecutorService registerAndNotifyExecutor = Executors.newFixedThreadPool(2);
-
- try {
- // We stub out the CacheClientNotifier.registerClientInternal() to do some "work" until
- // a new event is received and queued, as triggered by the waitForEventDispatchCountdownLatch
- doAnswer((i) -> {
- when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
- cacheClientNotifierSpy.addClientProxy(cacheClientProxy);
- waitForRegistrationCountdownLatch.countDown();
- waitForEventDispatchCountdownLatch.await();
- return null;
- }).when(cacheClientNotifierSpy).registerClientInternal(clientRegistrationMetadata, socket,
- false, 0, true);
- List<Callable<Void>> registerAndNotifyTasks = new ArrayList<>();
-
- // In one thread, we register the new client which should create the temporary client
- // registration event queue. Events will be passed to that queue while registration is
- // underway. Once registration is complete, the queue is drained and the event is processed
- // as normal.
- registerAndNotifyTasks.add(() -> {
- cacheClientNotifierSpy.registerClient(clientRegistrationMetadata, socket, false, 0, true);
- return null;
- });
-
- // In a second thread, we mock the arrival of a new event. We want to ensure this event
- // goes into the temporary client registration event queue. To do that, we wait on the
- // waitForRegistrationCountdownLatch until registration is underway and the temp queue is
- // created. Once it is, we process the event and notify clients, which should add the event
- // to the temp queue. Finally, we resume registration and after it is complete, we verify
- // that the event was drained, processed, and "delivered" (note message delivery is mocked
- // and results in a no-op).
- registerAndNotifyTasks.add(() -> {
- try {
- waitForRegistrationCountdownLatch.await();
-
- InternalCacheEvent internalCacheEvent =
- createMockInternalCacheEvent(clientProxyMembershipID, clientUpdateMessage,
- cacheClientNotifierSpy);
-
- CacheClientNotifier.notifyClients(internalCacheEvent, clientUpdateMessage);
- } finally {
- // Ensure we always countdown so if the test fails it won't hang due to
- // awaiting on this countdown latch.
- waitForEventDispatchCountdownLatch.countDown();
- }
- return null;
- });
-
- final List<Future<Void>> futures =
- registerAndNotifyExecutor.invokeAll(registerAndNotifyTasks);
-
- for (final Future future : futures) {
- future.get();
- }
- } finally {
- // To prevent not cleaning up test resources in case an unexpected exception occurs
- waitForEventDispatchCountdownLatch.countDown();
- waitForRegistrationCountdownLatch.countDown();
- registerAndNotifyExecutor.shutdownNow();
- cacheClientNotifier.shutdown(0);
+ throws Exception {
+ // this test requires real impl instance of ClientRegistrationEventQueueManager
+ clientRegistrationEventQueueManager = new ClientRegistrationEventQueueManager();
+
+ when(cacheClientProxy.getProxyID())
+ .thenReturn(clientProxyMembershipId);
+ when(clientRegistrationMetadata.getClientProxyMembershipID())
+ .thenReturn(clientProxyMembershipId);
+ when(internalCache.getCancelCriterion())
+ .thenReturn(mock(CancelCriterion.class));
+ when(internalCache.getCCPTimer())
+ .thenReturn(mock(SystemTimer.class));
+ when(internalCache.getInternalDistributedSystem())
+ .thenReturn(internalDistributedSystem);
+ when(internalDistributedSystem.getStatisticsManager())
+ .thenReturn(statisticsManager);
+ when(statisticsManager.createAtomicStatistics(any(), any()))
+ .thenReturn(statistics);
+
+ cacheClientNotifier = spy(CacheClientNotifier.getInstance(internalCache,
+ clientRegistrationEventQueueManager, mock(StatisticsClock.class),
+ mock(CacheServerStats.class), 0, 0, mock(ConnectionListener.class), null, false));
+
+ beforeLatch.set(new CountDownLatch(1));
+ afterLatch.set(new CountDownLatch(1));
+
+ // We stub out the CacheClientNotifier.registerClientInternal() to do some "work" until
+ // a new event is received and queued, as triggered by the afterLatch
+ doAnswer(invocation -> {
+ cacheClientNotifier.addClientProxy(cacheClientProxy);
+ beforeLatch.get().countDown();
+ afterLatch.get().await();
+ return null;
+ })
+ .when(cacheClientNotifier)
+ .registerClientInternal(clientRegistrationMetadata, socket, false, 0, true);
+
+ Collection<Callable<Void>> tasks = new ArrayList<>();
+
+ // In one thread, we register the new client which should create the temporary client
+ // registration event queue. Events will be passed to that queue while registration is
+ // underway. Once registration is complete, the queue is drained and the event is processed
+ // as normal.
+ tasks.add(() -> {
+ cacheClientNotifier.registerClient(clientRegistrationMetadata, socket, false, 0, true);
+ return null;
+ });
+
+ // In a second thread, we mock the arrival of a new event. We want to ensure this event
+ // goes into the temporary client registration event queue. To do that, we wait on the
+ // beforeLatch until registration is underway and the temp queue is
+ // created. Once it is, we process the event and notify clients, which should add the event
+ // to the temp queue. Finally, we resume registration and after it is complete, we verify
+ // that the event was drained, processed, and "delivered" (note message delivery is mocked
+ // and results in a no-op).
+ tasks.add(() -> {
+ beforeLatch.get().await();
+
+ InternalCacheEvent internalCacheEvent = internalCacheEvent(clientProxyMembershipId);
+ ClientUpdateMessageImpl clientUpdateMessageImpl = mock(ClientUpdateMessageImpl.class);
+ CacheClientNotifier.notifyClients(internalCacheEvent, clientUpdateMessageImpl);
+
+ afterLatch.get().countDown();
+ return null;
+ });
+
+ for (Future future : executorServiceRule.getExecutorService().invokeAll(tasks)) {
+ future.get();
}
- verify(cacheClientProxy, times(1)).deliverMessage(isA(HAEventWrapper.class));
+ verify(cacheClientProxy).deliverMessage(any());
}
@Test
- public void initializingMessageShouldntSerializeValuePrematurely() throws Exception {
- InternalCache internalCache = Fakes.cache();
- CacheServerStats cacheServerStats = mock(CacheServerStats.class);
- ConnectionListener connectionListener = mock(ConnectionListener.class);
- ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
- ClientRegistrationMetadata clientRegistrationMetadata = mock(ClientRegistrationMetadata.class);
- StatisticsClock statisticsClock = mock(StatisticsClock.class);
-
- when(clientRegistrationMetadata.getClientProxyMembershipID()).thenReturn(
- clientProxyMembershipID);
-
- CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
- new ClientRegistrationEventQueueManager(), statisticsClock, cacheServerStats, 0, 0,
- connectionListener, null, false);
- LocalRegion region = mock(LocalRegion.class);
-
- EntryEventImpl entryEvent = mock(EntryEventImpl.class);
- when(entryEvent.getEventType()).thenReturn(EnumListenerEvent.AFTER_CREATE);
- when(entryEvent.getOperation()).thenReturn(Operation.CREATE);
- when(entryEvent.getRegion()).thenReturn(region);
- cacheClientNotifier.constructClientMessage(entryEvent);
- verify(entryEvent, times(0)).exportNewValue(any());
+ public void initializingMessageDoesNotSerializeValuePrematurely() {
+ // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+ EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+
+ when(entryEventImpl.getEventType())
+ .thenReturn(EnumListenerEvent.AFTER_CREATE);
+ when(entryEventImpl.getOperation())
+ .thenReturn(Operation.CREATE);
+ when(entryEventImpl.getRegion())
+ .thenReturn(region);
+ when(internalCache.getCCPTimer())
+ .thenReturn(mock(SystemTimer.class));
+ when(internalCache.getInternalDistributedSystem())
+ .thenReturn(internalDistributedSystem);
+ when(internalDistributedSystem.getStatisticsManager())
+ .thenReturn(statisticsManager);
+ when(statisticsManager.createAtomicStatistics(any(), any()))
+ .thenReturn(statistics);
+
+ cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
+ mock(ClientRegistrationEventQueueManager.class), mock(StatisticsClock.class),
+ mock(CacheServerStats.class), 0, 0, mock(ConnectionListener.class), null, false);
+
+ cacheClientNotifier.constructClientMessage(entryEventImpl);
+
+ verify(entryEventImpl, never()).exportNewValue(any());
}
@Test
- public void clientRegistrationFailsQueueStillDrained()
- throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException,
- IllegalAccessException, IOException {
- InternalCache internalCache = Fakes.cache();
- CacheServerStats cacheServerStats = mock(CacheServerStats.class);
- Socket socket = mock(Socket.class);
- ConnectionListener connectionListener = mock(ConnectionListener.class);
- ClientRegistrationMetadata clientRegistrationMetadata = mock(ClientRegistrationMetadata.class);
- StatisticsClock statisticsClock = mock(StatisticsClock.class);
- ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
- ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
- mock(ClientRegistrationEventQueueManager.class);
- ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
- mock(ClientRegistrationEventQueueManager.ClientRegistrationEventQueue.class);
-
- when(clientRegistrationMetadata.getClientProxyMembershipID()).thenReturn(
- clientProxyMembershipID);
- when(clientRegistrationEventQueueManager.create(eq(clientProxyMembershipID), any(), any()))
- .thenReturn(clientRegistrationEventQueue);
-
- CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
- clientRegistrationEventQueueManager, statisticsClock, cacheServerStats, 0, 0,
- connectionListener, null, false);
- CacheClientNotifier cacheClientNotifierSpy = spy(cacheClientNotifier);
+ public void clientRegistrationFailsQueueStillDrained() throws Exception {
+ ClientRegistrationEventQueue clientRegistrationEventQueue =
+ mock(ClientRegistrationEventQueue.class);
- doAnswer((i) -> {
- throw new RegionQueueException();
- }).when(cacheClientNotifierSpy).registerClientInternal(clientRegistrationMetadata, socket,
- false, 0, true);
-
- assertThatThrownBy(() -> cacheClientNotifierSpy.registerClient(clientRegistrationMetadata,
- socket, false, 0, true))
- .isInstanceOf(IOException.class);
-
- verify(clientRegistrationEventQueueManager, times(1)).create(
- eq(clientProxyMembershipID), any(), any());
-
- verify(clientRegistrationEventQueueManager, times(1)).drain(
- eq(clientRegistrationEventQueue),
- eq(cacheClientNotifierSpy));
- }
-
- private InternalCacheEvent createMockInternalCacheEvent(
- final ClientProxyMembershipID clientProxyMembershipID,
- final ClientUpdateMessageImpl clientUpdateMessage,
- final CacheClientNotifier cacheClientNotifierSpy) {
- InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
-
- DistributedRegion region = mock(DistributedRegion.class);
- when(internalCacheEvent.getRegion()).thenReturn(region);
-
- FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
- final Long cqId = 0L;
- final String cqName = "testCQ";
-
- HashMap cqs = new HashMap<Long, Integer>() {
- {
- put(cqId, 123);
- }
- };
- when(filterInfo.getCQs()).thenReturn(cqs);
- when(internalCacheEvent.getLocalFilterInfo()).thenReturn(
- filterInfo);
- when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
-
- FilterProfile filterProfile = mock(FilterProfile.class);
- when(filterProfile.getRealCqID(cqId)).thenReturn(cqName);
-
- ServerCQ serverCQ = mock(ServerCQ.class);
- when(serverCQ.getClientProxyId()).thenReturn(clientProxyMembershipID);
- when(filterProfile.getCq(cqName)).thenReturn(serverCQ);
- when(region.getFilterProfile()).thenReturn(filterProfile);
-
- FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
- when(filterRoutingInfo.getLocalFilterInfo()).thenReturn(filterInfo);
- when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent, true))
- .thenReturn(filterRoutingInfo);
- doReturn(clientUpdateMessage).when(cacheClientNotifierSpy)
- .constructClientMessage(internalCacheEvent);
-
- return internalCacheEvent;
+ when(clientRegistrationEventQueueManager.create(eq(clientProxyMembershipId), any(), any()))
+ .thenReturn(clientRegistrationEventQueue);
+ when(clientRegistrationMetadata.getClientProxyMembershipID())
+ .thenReturn(clientProxyMembershipId);
+ when(internalCache.getCCPTimer())
+ .thenReturn(mock(SystemTimer.class));
+ when(internalCache.getInternalDistributedSystem())
+ .thenReturn(internalDistributedSystem);
+ when(internalDistributedSystem.getStatisticsManager())
+ .thenReturn(statisticsManager);
+ when(statisticsManager.createAtomicStatistics(any(), any()))
+ .thenReturn(statistics);
+
+ cacheClientNotifier = spy(CacheClientNotifier.getInstance(internalCache,
+ clientRegistrationEventQueueManager, mock(StatisticsClock.class),
+ mock(CacheServerStats.class), 0, 0, mock(ConnectionListener.class), null, false));
+
+ doThrow(new RegionQueueException("thrown during client registration"))
+ .when(cacheClientNotifier)
+ .registerClientInternal(clientRegistrationMetadata, socket, false, 0, true);
+
+ Throwable thrown = catchThrowable(() -> {
+ cacheClientNotifier.registerClient(clientRegistrationMetadata, socket, false, 0, true);
+ });
+ assertThat(thrown).isInstanceOf(IOException.class);
+
+ verify(clientRegistrationEventQueueManager)
+ .create(eq(clientProxyMembershipId), any(), any());
+
+ verify(clientRegistrationEventQueueManager)
+ .drain(eq(clientRegistrationEventQueue), eq(cacheClientNotifier));
}
@Test
public void testSingletonHasClientProxiesFalseNoCCN() {
- assertFalse(CacheClientNotifier.singletonHasClientProxies());
+ assertThat(CacheClientNotifier.singletonHasClientProxies()).isFalse();
}
@Test
public void testSingletonHasClientProxiesFalseNoProxy() {
- InternalCache internalCache = Fakes.cache();
+ when(internalCache.getCCPTimer())
+ .thenReturn(mock(SystemTimer.class));
- CacheClientNotifier ccn =
- CacheClientNotifier.getInstance(internalCache,
- mock(ClientRegistrationEventQueueManager.class),
- mock(StatisticsClock.class),
- mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), null, true);
-
- assertFalse(CacheClientNotifier.singletonHasClientProxies());
- ccn.shutdown(111);
+ cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
+ mock(ClientRegistrationEventQueueManager.class), mock(StatisticsClock.class),
+ mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), null, true);
+ assertThat(CacheClientNotifier.singletonHasClientProxies()).isFalse();
}
@Test
public void testSingletonHasClientProxiesTrue() {
- InternalCache internalCache = Fakes.cache();
- CacheClientProxy proxy = mock(CacheClientProxy.class);
+ when(cacheClientProxy.getAcceptorId())
+ .thenReturn(111L);
+ when(cacheClientProxy.getProxyID())
+ .thenReturn(mock(ClientProxyMembershipID.class));
+ when(internalCache.getCCPTimer())
+ .thenReturn(mock(SystemTimer.class));
- CacheClientNotifier ccn =
- CacheClientNotifier.getInstance(internalCache,
- mock(ClientRegistrationEventQueueManager.class),
- mock(StatisticsClock.class),
- mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), null, true);
+ cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
+ mock(ClientRegistrationEventQueueManager.class), mock(StatisticsClock.class),
+ mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), null, true);
- when(proxy.getProxyID()).thenReturn(mock(ClientProxyMembershipID.class));
- ccn.addClientProxy(proxy);
+ cacheClientNotifier.addClientProxy(cacheClientProxy);
// check ClientProxy Map is not empty
- assertTrue(CacheClientNotifier.singletonHasClientProxies());
-
- when(proxy.getAcceptorId()).thenReturn(Long.valueOf(111));
- ccn.shutdown(111);
+ assertThat(CacheClientNotifier.singletonHasClientProxies()).isTrue();
}
@Test
public void testSingletonHasInitClientProxiesTrue() {
- InternalCache internalCache = Fakes.cache();
- CacheClientProxy proxy = mock(CacheClientProxy.class);
+ when(cacheClientProxy.getAcceptorId())
+ .thenReturn(111L);
+ when(cacheClientProxy.getProxyID())
+ .thenReturn(mock(ClientProxyMembershipID.class));
+ when(internalCache.getCCPTimer())
+ .thenReturn(mock(SystemTimer.class));
- CacheClientNotifier ccn =
- CacheClientNotifier.getInstance(internalCache,
- mock(ClientRegistrationEventQueueManager.class),
- mock(StatisticsClock.class),
- mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), null, true);
+ cacheClientNotifier = CacheClientNotifier.getInstance(internalCache,
+ mock(ClientRegistrationEventQueueManager.class), mock(StatisticsClock.class),
+ mock(CacheServerStats.class), 10, 10, mock(ConnectionListener.class), null, true);
- when(proxy.getProxyID()).thenReturn(mock(ClientProxyMembershipID.class));
- ccn.addClientInitProxy(proxy);
+ cacheClientNotifier.addClientInitProxy(cacheClientProxy);
// check InitClientProxy Map is not empty
- assertTrue(CacheClientNotifier.singletonHasClientProxies());
+ assertThat(CacheClientNotifier.singletonHasClientProxies()).isTrue();
- ccn.addClientProxy(proxy);
+ cacheClientNotifier.addClientProxy(cacheClientProxy);
// check ClientProxy Map is not empty
- assertTrue(CacheClientNotifier.singletonHasClientProxies());
-
- when(proxy.getAcceptorId()).thenReturn(Long.valueOf(111));
- ccn.shutdown(111);
+ assertThat(CacheClientNotifier.singletonHasClientProxies()).isTrue();
}
- private void shutdownExistingCacheClientNotifier() {
- CacheClientNotifier cacheClientNotifier = CacheClientNotifier.getInstance();
- if (cacheClientNotifier != null) {
- cacheClientNotifier.shutdown(0);
- }
+ private InternalCacheEvent internalCacheEvent(ClientProxyMembershipID clientProxyMembershipID) {
+ FilterInfo filterInfo = mock(FilterInfo.class);
+ FilterProfile filterProfile = mock(FilterProfile.class);
+ FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
+ InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
+ ServerCQ serverCQ = mock(ServerCQ.class);
+
+ HashMap<Long, Integer> cqs = new HashMap<>();
+ cqs.put(CQ_ID, 123);
+
+ when(filterInfo.getCQs())
+ .thenReturn(cqs);
+ when(filterProfile.getCq(CQ_NAME))
+ .thenReturn(serverCQ);
+ when(filterProfile.getRealCqID(CQ_ID))
+ .thenReturn(CQ_NAME);
+ when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent, true))
+ .thenReturn(filterRoutingInfo);
+ when(filterRoutingInfo.getLocalFilterInfo())
+ .thenReturn(filterInfo);
+ when(internalCacheEvent.getRegion())
+ .thenReturn(region);
+ when(internalCacheEvent.getLocalFilterInfo())
+ .thenReturn(filterInfo);
+ when(internalCacheEvent.getOperation())
+ .thenReturn(mock(Operation.class));
+ when(region.getFilterProfile())
+ .thenReturn(filterProfile);
+ when(serverCQ.getClientProxyId())
+ .thenReturn(clientProxyMembershipID);
+
+ return internalCacheEvent;
}
+
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
index 7ef75fe..150a763 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationEventQueueManagerTest.java
@@ -12,122 +12,152 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
-
package org.apache.geode.internal.cache.tier.sockets;
+import static java.util.Collections.emptySet;
+import static org.apache.geode.internal.util.CollectionUtils.asSet;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.catchThrowable;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.mockito.quality.Strictness.STRICT_STUBS;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
-import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.mockito.stubbing.Answer;
import org.apache.geode.cache.Operation;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.FilterRoutingInfo;
+import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.InternalCacheEvent;
-import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.tier.sockets.ClientRegistrationEventQueueManager.ClientRegistrationEventQueue;
public class ClientRegistrationEventQueueManagerTest {
- @Test
- public void messageDeliveredAfterRegisteringOnDrainIfNewFilterIDsIncludesClient() {
- ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
- new ClientRegistrationEventQueueManager();
- ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-
- ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
- clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
+ private CacheClientNotifier cacheClientNotifier;
+ private CacheClientProxy cacheClientProxy;
+ private ClientProxyMembershipID clientProxyMembershipId;
+ private ClientUpdateMessageImpl clientUpdateMessage;
+ private FilterInfo filterInfo;
+ private FilterProfile filterProfile;
+ private FilterRoutingInfo filterRoutingInfo;
+ private InternalCacheEvent internalCacheEvent;
+ private InternalRegion internalRegion;
+ private Operation operation;
+
+ @Rule
+ public MockitoRule mockitoRule = MockitoJUnit.rule().strictness(STRICT_STUBS);
+
+ @Before
+ public void setUp() {
+ cacheClientNotifier = mock(CacheClientNotifier.class);
+ cacheClientProxy = mock(CacheClientProxy.class);
+ clientProxyMembershipId = mock(ClientProxyMembershipID.class);
+ clientUpdateMessage = mock(ClientUpdateMessageImpl.class);
+ filterInfo = mock(FilterInfo.class);
+ filterProfile = mock(FilterProfile.class);
+ filterRoutingInfo = mock(FilterRoutingInfo.class);
+ internalCacheEvent = mock(InternalCacheEvent.class);
+ internalRegion = mock(InternalRegion.class);
+ operation = mock(Operation.class);
+ }
- InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
- LocalRegion localRegion = mock(LocalRegion.class);
- FilterProfile filterProfile = mock(FilterProfile.class);
- FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
- FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+ @Test
+ public void messageDeliveredAfterRegisteringOnDrainIfNewFilterIDsIncludesClient() {
+ // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+ EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
- when(filterRoutingInfo.getLocalFilterInfo()).thenReturn(
- filterInfo);
- when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent, true))
+ when(cacheClientNotifier.getClientProxy(clientProxyMembershipId))
+ .thenReturn(cacheClientProxy);
+ when(cacheClientNotifier.getFilterClientIDs(entryEventImpl, filterProfile, filterInfo,
+ clientUpdateMessage))
+ .thenReturn(asSet(clientProxyMembershipId));
+ when(cacheClientProxy.getProxyID())
+ .thenReturn(clientProxyMembershipId);
+ when(entryEventImpl.getOperation())
+ .thenReturn(operation);
+ when(entryEventImpl.getRegion())
+ .thenReturn(internalRegion);
+ when(filterProfile.getFilterRoutingInfoPart2(null, entryEventImpl, true))
.thenReturn(filterRoutingInfo);
- when(localRegion.getFilterProfile()).thenReturn(filterProfile);
- when(internalCacheEvent.getRegion()).thenReturn(localRegion);
- when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
+ when(filterRoutingInfo.getLocalFilterInfo())
+ .thenReturn(filterInfo);
+ when(internalRegion.getFilterProfile())
+ .thenReturn(filterProfile);
+ when(operation.isEntry())
+ .thenReturn(true);
- ClientUpdateMessageImpl clientUpdateMessage = mock(ClientUpdateMessageImpl.class);
+ ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+ new ClientRegistrationEventQueueManager();
- CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
- Set<ClientProxyMembershipID> recalculatedFilterClientIDs = new HashSet<>();
- recalculatedFilterClientIDs.add(clientProxyMembershipID);
- when(cacheClientNotifier.getFilterClientIDs(internalCacheEvent, filterProfile, filterInfo,
- clientUpdateMessage))
- .thenReturn(recalculatedFilterClientIDs);
- CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
- when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
- when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
+ ClientRegistrationEventQueue clientRegistrationEventQueue =
+ clientRegistrationEventQueueManager.create(clientProxyMembershipId,
+ new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
// Create empty filter client IDs produced by the "normal" put processing path, so we can test
// that the event is still delivered if the client finished registering and needs the event.
- Set<ClientProxyMembershipID> normalPutFilterClientIDs = new HashSet<>();
- clientRegistrationEventQueueManager
- .add(internalCacheEvent, clientUpdateMessage, normalPutFilterClientIDs,
- cacheClientNotifier);
+ clientRegistrationEventQueueManager.add(entryEventImpl, clientUpdateMessage,
+ clientUpdateMessage, emptySet(), cacheClientNotifier);
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
// The client update message should still be delivered because it is now part of the
// filter clients interested in this event, despite having not been included in the original
// filter info in the "normal" put processing path.
- verify(cacheClientProxy, times(1)).deliverMessage(clientUpdateMessage);
+ verify(cacheClientProxy).deliverMessage(clientUpdateMessage);
}
@Test
public void clientRemovedFromFilterClientsListIfEventAddedToRegistrationQueue() {
+ // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+ EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+
+ when(entryEventImpl.getOperation())
+ .thenReturn(operation);
+ when(operation.isEntry())
+ .thenReturn(true);
+
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
- ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-
- clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+ clientRegistrationEventQueueManager.create(clientProxyMembershipId,
new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
- InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
- when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
- when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
-
- Conflatable conflatable = mock(Conflatable.class);
-
// Add the registering client to the filter clients. This can happen if the filter info is
// received but the client is not completely registered yet (queue GII has not been completed).
// In that case, we want to remove the client from the filter IDs set and add the event
// to the client's registration queue.
- Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
- filterClientIDs.add(clientProxyMembershipID);
-
- CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+ Set<ClientProxyMembershipID> filterClientIds = asSet(clientProxyMembershipId);
- clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
- cacheClientNotifier);
+ clientRegistrationEventQueueManager.add(entryEventImpl, mock(ClientUpdateMessageImpl.class),
+ mock(Conflatable.class), filterClientIds, mock(CacheClientNotifier.class));
// The client should no longer be in the filter clients since the event was queued in the
// client's registration queue.
- assertThat(filterClientIDs.isEmpty()).isTrue();
+ assertThat(filterClientIds).isEmpty();
}
@Test
@@ -135,108 +165,96 @@ public class ClientRegistrationEventQueueManagerTest {
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
- ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-
- ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
- clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(),
- new ReentrantReadWriteLock());
+ ClientRegistrationEventQueue clientRegistrationEventQueue =
+ clientRegistrationEventQueueManager.create(mock(ClientProxyMembershipID.class),
+ new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
- List<HAEventWrapper> haEventWrappers = new ArrayList<>();
- CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
+ Collection<HAEventWrapper> haEventWrappers = new ArrayList<>();
for (int i = 0; i < 5; ++i) {
+ EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+
+ when(entryEventImpl.getOperation())
+ .thenReturn(operation);
+ when(operation.isEntry())
+ .thenReturn(true);
+
HAEventWrapper haEventWrapper = mock(HAEventWrapper.class);
haEventWrappers.add(haEventWrapper);
- InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
- when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
- when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
- clientRegistrationEventQueueManager.add(internalCacheEvent,
- haEventWrapper, new HashSet<>(), cacheClientNotifier);
- verify(haEventWrapper, times(1)).incrementPutInProgressCounter(anyString());
+
+ clientRegistrationEventQueueManager.add(entryEventImpl,
+ mock(ClientUpdateMessageImpl.class), haEventWrapper, emptySet(), cacheClientNotifier);
+
+ verify(haEventWrapper).incrementPutInProgressCounter(anyString());
}
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
for (HAEventWrapper haEventWrapper : haEventWrappers) {
- verify(haEventWrapper, times(1)).decrementPutInProgressCounter();
+ verify(haEventWrapper).decrementPutInProgressCounter();
}
}
@Test
- public void addAndDrainQueueContentionTest() throws ExecutionException, InterruptedException {
- ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
- ReentrantReadWriteLock mockPutDrainLock = mock(ReentrantReadWriteLock.class);
- ReentrantReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
-
- when(mockPutDrainLock.readLock())
- .thenReturn(actualPutDrainLock.readLock());
+ public void addAndDrainQueueContentionTest() throws Exception {
+ ReentrantReadWriteLock readWriteLock = spy(new ReentrantReadWriteLock());
- when(mockPutDrainLock.writeLock())
- .thenAnswer(i -> {
+ when(readWriteLock.writeLock())
+ .thenAnswer((Answer<WriteLock>) invocation -> {
// Force a context switch from drain to put thread so we can ensure the event is not lost
Thread.sleep(1);
- return actualPutDrainLock.writeLock();
+ return (WriteLock) invocation.callRealMethod();
});
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
- ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
- clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(), mockPutDrainLock);
-
- InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
- when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
- when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
-
- Conflatable conflatable = mock(Conflatable.class);
- Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
- CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
- CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
- when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
+ ClientRegistrationEventQueue clientRegistrationEventQueue =
+ clientRegistrationEventQueueManager.create(mock(ClientProxyMembershipID.class),
+ new ConcurrentLinkedQueue<>(), readWriteLock);
CompletableFuture<Void> addEventsToQueueTask = CompletableFuture.runAsync(() -> {
- for (int numAdds = 0; numAdds < 100000; ++numAdds) {
+ for (int count = 0; count < 1_000; ++count) { // was 100_000
// In thread one, we add events to the queue
- clientRegistrationEventQueueManager
- .add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier);
+ clientRegistrationEventQueueManager.add(entryEventImpl(),
+ mock(ClientUpdateMessageImpl.class), mock(Conflatable.class), emptySet(),
+ cacheClientNotifier);
}
});
CompletableFuture<Void> drainEventsFromQueueTask = CompletableFuture.runAsync(() -> {
// In thread two, we drain events from the queue
- clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
+ clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue,
+ cacheClientNotifier);
});
- CompletableFuture.allOf(addEventsToQueueTask, drainEventsFromQueueTask).get();
+ CompletableFuture
+ .allOf(addEventsToQueueTask, drainEventsFromQueueTask)
+ .get();
assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
}
@Test
public void addEventWithOffheapValueCopiedToHeap() {
- EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
- when(internalCacheEvent.getRegion()).thenReturn(mock(LocalRegion.class));
- Operation mockOperation = mock(Operation.class);
- when(mockOperation.isEntry()).thenReturn(true);
- when(internalCacheEvent.getOperation()).thenReturn(mockOperation);
+ // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+ EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
- Conflatable conflatable = mock(Conflatable.class);
- Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
- CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
- ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
+ when(entryEventImpl.getOperation())
+ .thenReturn(operation);
+ when(operation.isEntry())
+ .thenReturn(true);
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
- clientRegistrationEventQueueManager.create(clientProxyMembershipID,
+ clientRegistrationEventQueueManager.create(mock(ClientProxyMembershipID.class),
new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
- clientRegistrationEventQueueManager
- .add(internalCacheEvent, conflatable, filterClientIDs, cacheClientNotifier);
+ clientRegistrationEventQueueManager.add(entryEventImpl, mock(ClientUpdateMessageImpl.class),
+ mock(Conflatable.class), emptySet(), mock(CacheClientNotifier.class));
- verify(internalCacheEvent, times(1)).copyOffHeapToHeap();
+ verify(entryEventImpl).copyOffHeapToHeap();
}
@Test
@@ -244,24 +262,17 @@ public class ClientRegistrationEventQueueManagerTest {
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
- CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
- ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
-
- ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
- clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(),
- new ReentrantReadWriteLock());
+ ClientRegistrationEventQueue clientRegistrationEventQueue =
+ clientRegistrationEventQueueManager.create(mock(ClientProxyMembershipID.class),
+ new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
- EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
- Conflatable conflatable = mock(Conflatable.class);
- Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
-
// Pass a new event to the ClientRegistrationEventQueueManager. This event should not be added
// to the test client's registration queue, because it should already be removed. We can
// validate that by asserting that the client's registration queue is empty after the add.
- clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
+ clientRegistrationEventQueueManager.add(mock(InternalCacheEvent.class),
+ mock(ClientUpdateMessageImpl.class), mock(Conflatable.class), emptySet(),
cacheClientNotifier);
assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
@@ -269,100 +280,128 @@ public class ClientRegistrationEventQueueManagerTest {
@Test
public void drainThrowsExceptionQueueStillRemoved() {
- CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
- CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
- ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
- when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
+ // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+ EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+ RuntimeException thrownException = new RuntimeException("thrownException");
+
+ when(cacheClientNotifier.getClientProxy(clientProxyMembershipId))
+ .thenReturn(mock(CacheClientProxy.class));
+ when(entryEventImpl.getOperation())
+ .thenReturn(operation);
+ when(entryEventImpl.getRegion())
+ .thenThrow(thrownException);
+ when(operation.isEntry())
+ .thenReturn(true);
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
- ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
- clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(),
- new ReentrantReadWriteLock());
+ ClientRegistrationEventQueue clientRegistrationEventQueue =
+ clientRegistrationEventQueueManager.create(clientProxyMembershipId,
+ new ConcurrentLinkedQueue<>(), new ReentrantReadWriteLock());
- Conflatable conflatable = mock(Conflatable.class);
- Set<ClientProxyMembershipID> filterClientIDs = new HashSet<>();
+ Set<ClientProxyMembershipID> filterClientIds = new HashSet<>();
- EntryEventImpl internalCacheEvent = mock(EntryEventImpl.class);
- RuntimeException testException = new RuntimeException();
- when(internalCacheEvent.getRegion()).thenThrow(testException);
- Operation mockOperation = mock(Operation.class);
- when(mockOperation.isEntry()).thenReturn(true);
- when(internalCacheEvent.getOperation()).thenReturn(mockOperation);
+ clientRegistrationEventQueueManager.add(entryEventImpl, clientUpdateMessage,
+ mock(Conflatable.class), filterClientIds, cacheClientNotifier);
- clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
- cacheClientNotifier);
+ Throwable thrown = catchThrowable(() -> {
+ clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
+ });
- assertThatThrownBy(() -> clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue,
- cacheClientNotifier))
- .isEqualTo(testException);
+ assertThat(thrown).isSameAs(thrownException);
// Pass a new event to the ClientRegistrationEventQueueManager. This event should not be added
// to the test client's registration queue, because it should already be removed. We can
// validate that by asserting that the client's registration queue is empty after the add.
- clientRegistrationEventQueueManager.add(internalCacheEvent, conflatable, filterClientIDs,
- cacheClientNotifier);
+ clientRegistrationEventQueueManager.add(entryEventImpl, clientUpdateMessage,
+ mock(Conflatable.class), filterClientIds, cacheClientNotifier);
- assertThat(clientRegistrationEventQueue.isEmpty()).isTrue();
+ assertThat(clientRegistrationEventQueue.isEmpty())
+ .withFailMessage(clientRegistrationEventQueue + " should be empty.")
+ .isTrue();
}
@Test
public void addEventInOriginalFilterIDsButQueueWasRemovedDueToSuccessfulRegistrationSoEventNotRedelivered() {
- ClientProxyMembershipID clientProxyMembershipID = mock(ClientProxyMembershipID.class);
- CacheClientNotifier cacheClientNotifier = mock(CacheClientNotifier.class);
- CacheClientProxy cacheClientProxy = mock(CacheClientProxy.class);
- when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
- Set<ClientProxyMembershipID> originalFilterIDs = new HashSet<>();
- originalFilterIDs.add(clientProxyMembershipID);
-
- ClientUpdateMessageImpl clientUpdateMessage = mock(ClientUpdateMessageImpl.class);
-
- InternalCacheEvent internalCacheEvent = mock(InternalCacheEvent.class);
- LocalRegion localRegion = mock(LocalRegion.class);
- FilterProfile filterProfile = mock(FilterProfile.class);
- FilterRoutingInfo filterRoutingInfo = mock(FilterRoutingInfo.class);
- FilterRoutingInfo.FilterInfo filterInfo = mock(FilterRoutingInfo.FilterInfo.class);
-
- when(filterRoutingInfo.getLocalFilterInfo()).thenReturn(
- filterInfo);
- when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent))
- .thenReturn(filterRoutingInfo);
- when(localRegion.getFilterProfile()).thenReturn(filterProfile);
- when(internalCacheEvent.getRegion()).thenReturn(localRegion);
- when(internalCacheEvent.getOperation()).thenReturn(mock(Operation.class));
-
- Set<ClientProxyMembershipID> recalculatedFilterClientIDs = new HashSet<>();
- recalculatedFilterClientIDs.add(clientProxyMembershipID);
+ // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+ when(cacheClientNotifier.getClientProxy(clientProxyMembershipId))
+ .thenReturn(cacheClientProxy);
when(cacheClientNotifier.getFilterClientIDs(internalCacheEvent, filterProfile, filterInfo,
clientUpdateMessage))
- .thenReturn(recalculatedFilterClientIDs);
- when(cacheClientProxy.getProxyID()).thenReturn(clientProxyMembershipID);
- when(cacheClientNotifier.getClientProxy(clientProxyMembershipID)).thenReturn(cacheClientProxy);
- ReentrantReadWriteLock mockReadWriteLock = mock(ReentrantReadWriteLock.class);
+ .thenReturn(asSet(clientProxyMembershipId));
+ when(cacheClientProxy.getProxyID())
+ .thenReturn(clientProxyMembershipId);
+ when(internalCacheEvent.getRegion())
+ .thenReturn(internalRegion);
+ when(filterProfile.getFilterRoutingInfoPart2(null, internalCacheEvent, true))
+ .thenReturn(filterRoutingInfo);
+ when(filterRoutingInfo.getLocalFilterInfo())
+ .thenReturn(filterInfo);
+ when(internalRegion.getFilterProfile())
+ .thenReturn(filterProfile);
+
+ ReentrantReadWriteLock readWriteLock = spy(new ReentrantReadWriteLock());
+ ReadLock readLock = spy(readWriteLock.readLock());
+
+ when(readWriteLock.readLock())
+ .thenReturn(readLock);
ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
new ClientRegistrationEventQueueManager();
- ClientRegistrationEventQueueManager.ClientRegistrationEventQueue clientRegistrationEventQueue =
- clientRegistrationEventQueueManager.create(clientProxyMembershipID,
- new ConcurrentLinkedQueue<>(),
- mockReadWriteLock);
+ ClientRegistrationEventQueue clientRegistrationEventQueue =
+ clientRegistrationEventQueueManager.create(clientProxyMembershipId,
+ new ConcurrentLinkedQueue<>(), readWriteLock);
- ReentrantReadWriteLock.ReadLock mockReadLock = mock(ReentrantReadWriteLock.ReadLock.class);
- when(mockReadWriteLock.readLock()).thenReturn(mockReadLock);
- ReentrantReadWriteLock actualPutDrainLock = new ReentrantReadWriteLock();
- when(mockReadWriteLock.writeLock()).thenReturn(actualPutDrainLock.writeLock());
- doAnswer(i -> {
+ doAnswer((Answer<Void>) invocation -> {
clientRegistrationEventQueueManager.drain(clientRegistrationEventQueue, cacheClientNotifier);
- actualPutDrainLock.readLock();
+ invocation.callRealMethod();
return null;
- }).when(mockReadLock).lock();
+ })
+ .when(readLock)
+ .lock();
clientRegistrationEventQueueManager.add(internalCacheEvent, clientUpdateMessage,
- originalFilterIDs, cacheClientNotifier);
+ clientUpdateMessage, asSet(clientProxyMembershipId), cacheClientNotifier);
+
+ verify(cacheClientProxy, never()).deliverMessage(clientUpdateMessage);
+ }
+
+ @Test
+ public void addEventWithClientTombstoneDoesNotExportNewValue() {
+ ClientTombstoneMessage clientTombstoneMessage = mock(ClientTombstoneMessage.class);
+ // this test requires mock of EntryEventImpl instead of InternalCacheEvent
+ EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+
+ ClientRegistrationEventQueueManager clientRegistrationEventQueueManager =
+ new ClientRegistrationEventQueueManager();
+
+ clientRegistrationEventQueueManager.add(entryEventImpl, clientTombstoneMessage,
+ clientTombstoneMessage, asSet(mock(ClientProxyMembershipID.class)),
+ mock(CacheClientNotifier.class));
+
+ verify(entryEventImpl, never()).exportNewValue(clientTombstoneMessage);
+ }
+
+ private EntryEventImpl entryEventImpl() {
+ EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+ Operation operation = operation();
+
+ when(entryEventImpl.getOperation())
+ .thenReturn(operation);
+ when(entryEventImpl.getRegion())
+ .thenReturn(internalRegion);
+
+ return entryEventImpl;
+ }
+
+ private Operation operation() {
+ Operation operation = mock(Operation.class);
+
+ when(operation.isEntry())
+ .thenReturn(true);
- verify(cacheClientProxy, times(0)).deliverMessage(clientUpdateMessage);
+ return operation;
}
}
diff --git a/geode-junit/src/main/java/org/apache/geode/test/fake/Fakes.java b/geode-junit/src/main/java/org/apache/geode/test/fake/Fakes.java
index 9cd71e6..1e663a8 100644
--- a/geode-junit/src/main/java/org/apache/geode/test/fake/Fakes.java
+++ b/geode-junit/src/main/java/org/apache/geode/test/fake/Fakes.java
@@ -64,7 +64,7 @@ import org.apache.geode.pdx.internal.TypeRegistry;
* <pre>
* cache = Fakes.cache(); Mockito.when(cache.getName()).thenReturn(...)
*
- * <pre>
+ * </pre>
*
* Please help extend this class by adding other commonly used objects to this collection of fakes.
*/