You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by la...@apache.org on 2018/08/01 18:25:39 UTC
[geode] branch develop updated: GEODE-5420: Protect events in
HAContainer from premature modification
This is an automated email from the ASF dual-hosted git repository.
ladyvader pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 103b467 GEODE-5420: Protect events in HAContainer from premature modification
103b467 is described below
commit 103b467ab6205f6c9d180d5ad28705ce47c7b5bb
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Tue Jul 3 16:26:40 2018 -0700
GEODE-5420: Protect events in HAContainer from premature modification
- Updated putInProgress boolean in HAWrapper to a counter to prevent prematurely setting ClientUpdateMessage to null when events are temporarily queued during a GII or message dispatcher initialization
- decAndRemoveFromHAContainer only removes when putInProgress counter and ref count are 0
- Refactored putEventInHARegion/putConditionallyInHAContainer to prevent overwriting an existing entry in the HAContainer. Also reduces simplifies the code and reduces duplicated logic.
- Wrote missing basic HARegionQueue unit/integration tests, and an integration test to capture setting the ClientUpdateMessage property on HAEventWrapper to null prematurely
- Added new event tracing messages at debug logging level to help track similar issues in the future
Co-authored-by: Ryan McMahon <rm...@pivotal.io>
Co-authored-by: Lynn Hughes-Godfrey <lh...@pivotal.io>
---
.../cache/ha/HARegionQueueIntegrationTest.java | 430 +++++++++++++++++++--
.../internal/cache/ha/HARegionQueueJUnitTest.java | 419 +++++++++++++++++++-
.../CacheClientNotifierIntegrationTest.java | 339 ++++++++++++++++
.../org/apache/geode/internal/cache/HARegion.java | 2 +-
.../geode/internal/cache/ha/HAContainerMap.java | 6 -
.../geode/internal/cache/ha/HARegionQueue.java | 294 +++++++++-----
.../cache/tier/sockets/CacheClientNotifier.java | 59 +--
.../cache/tier/sockets/CacheClientProxy.java | 64 ++-
.../cache/tier/sockets/HAEventWrapper.java | 73 ++--
9 files changed, 1450 insertions(+), 236 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
index 8a496d1..a1b71a1 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
@@ -18,31 +18,43 @@ import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mock;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import util.TestException;
import org.apache.geode.CancelCriterion;
+import org.apache.geode.Statistics;
+import org.apache.geode.StatisticsType;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
@@ -52,11 +64,15 @@ import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.DSClock;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.SystemTimer;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.CacheServerImpl;
import org.apache.geode.internal.cache.CachedDeserializable;
-import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -66,10 +82,13 @@ import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.VMCachedDeserializable;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage;
import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
+import org.apache.geode.internal.cache.tier.sockets.ConnectionListener;
import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
+import org.apache.geode.internal.concurrent.ConcurrentHashSet;
import org.apache.geode.internal.util.BlobHelper;
import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
@@ -111,6 +130,34 @@ public class HARegionQueueIntegrationTest {
return cache.createRegionFactory(RegionShortcut.REPLICATE).create("data");
}
+ private InternalCache createMockInternalCache() {
+ InternalCache mockInternalCache = Mockito.mock(InternalCache.class);
+ doReturn(Mockito.mock(SystemTimer.class)).when(mockInternalCache).getCCPTimer();
+ doReturn(Mockito.mock(CancelCriterion.class)).when(mockInternalCache).getCancelCriterion();
+
+ InternalDistributedSystem mockInteralDistributedSystem = createMockInternalDistributedSystem();
+ doReturn(mockInteralDistributedSystem).when(mockInternalCache).getInternalDistributedSystem();
+ doReturn(mockInteralDistributedSystem).when(mockInternalCache).getDistributedSystem();
+
+ return mockInternalCache;
+ }
+
+ private InternalDistributedSystem createMockInternalDistributedSystem() {
+ InternalDistributedSystem mockInternalDistributedSystem =
+ Mockito.mock(InternalDistributedSystem.class);
+ DistributionManager mockDistributionManager = Mockito.mock(DistributionManager.class);
+
+ doReturn(Mockito.mock(InternalDistributedMember.class)).when(mockInternalDistributedSystem)
+ .getDistributedMember();
+ doReturn(Mockito.mock(Statistics.class)).when(mockInternalDistributedSystem)
+ .createAtomicStatistics(any(StatisticsType.class), any(String.class));
+ doReturn(Mockito.mock(DistributionConfig.class)).when(mockDistributionManager).getConfig();
+ doReturn(mockDistributionManager).when(mockInternalDistributedSystem).getDistributionManager();
+ doReturn(Mockito.mock(DSClock.class)).when(mockInternalDistributedSystem).getClock();
+
+ return mockInternalDistributedSystem;
+ }
+
private CacheClientNotifier createCacheClientNotifier() {
// Create a mock CacheClientNotifier
CacheClientNotifier ccn = mock(CacheClientNotifier.class);
@@ -127,9 +174,18 @@ public class HARegionQueueIntegrationTest {
}
@Test
- public void verifyEndGiiQueueingPutsHAEventWrapperNotClientUpdateMessage() throws Exception {
+ public void verifyEndGiiQueueingEmptiesQueueAndHAContainer() throws Exception {
+ // We need to use an actual cache client notifier for this test because we are making assertions
+ // based on the actual CacheClientNotifier.checkAndRemoveFromClientMsgsRegion() method.
+ InternalCache mockInternalCache = createMockInternalCache();
+ CacheClientNotifier cacheClientNotifier =
+ CacheClientNotifier.getInstance(mockInternalCache, Mockito.mock(CacheServerStats.class),
+ 100000, 100000, Mockito.mock(ConnectionListener.class), null, false);
+ PowerMockito.when(CacheClientNotifier.getInstance()).thenReturn(cacheClientNotifier);
+
// Create a HAContainerRegion
- HAContainerWrapper haContainerWrapper = createHAContainerRegion();
+ HAContainerWrapper haContainerWrapper =
+ (HAContainerWrapper) cacheClientNotifier.getHaContainer();
// create message and HAEventWrapper
ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
@@ -137,6 +193,7 @@ public class HARegionQueueIntegrationTest {
new ClientProxyMembershipID(), new EventID(cache.getDistributedSystem()));
HAEventWrapper wrapper = new HAEventWrapper(message);
wrapper.setHAContainer(haContainerWrapper);
+ wrapper.incrementPutInProgressCounter();
// Create and update HARegionQueues forcing one queue to startGiiQueueing
int numQueues = 10;
@@ -147,28 +204,33 @@ public class HARegionQueueIntegrationTest {
assertEquals(1, haContainerWrapper.size());
HAEventWrapper wrapperInContainer = (HAEventWrapper) haContainerWrapper.getKey(wrapper);
- assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+ assertEquals(numQueues - 1, wrapperInContainer.getReferenceCount());
+ assertTrue(wrapperInContainer.getPutInProgress());
- // Verify that the HAEventWrapper in the giiQueue now has msg = null
- // this gets set to null when wrapper is added to HAContainer (for non-gii queues)
+ // Verify that the HAEventWrapper in the giiQueue now has msg != null
+ // We don't null this out while putInProgress > 0 (true)
Queue giiQueue = targetQueue.getGiiQueue();
assertEquals(1, giiQueue.size());
+ // Simulate that we have iterated through all interested proxies
+ // and are now decrementing the PutInProgressCounter
+ wrapperInContainer.decrementPutInProgressCounter();
+
+ // Simulate that other queues have processed this event, then
+ // peek and process the event off the giiQueue
+ for (int i = 0; i < numQueues - 1; ++i) {
+ targetQueue.decAndRemoveFromHAContainer(wrapper);
+ }
+
HAEventWrapper giiQueueEntry = (HAEventWrapper) giiQueue.peek();
assertNotNull(giiQueueEntry);
- assertNull(giiQueueEntry.getClientUpdateMessage());
+ assertNotNull(giiQueueEntry.getClientUpdateMessage());
- // endGiiQueueing and verify queue empty and putEventInHARegion invoked with HAEventWrapper
- // not ClientUpdateMessageImpl
- HARegionQueue spyTargetQueue = spy(targetQueue);
- spyTargetQueue.endGiiQueueing();
+ // endGiiQueueing and verify queue and HAContainer are empty
+ targetQueue.endGiiQueueing();
assertEquals(0, giiQueue.size());
- ArgumentCaptor<Conflatable> eventCaptor = ArgumentCaptor.forClass(Conflatable.class);
- verify(spyTargetQueue).putEventInHARegion(eventCaptor.capture(), anyLong());
- Conflatable capturedEvent = eventCaptor.getValue();
- assertTrue(capturedEvent instanceof HAEventWrapper);
- assertNotNull(((HAEventWrapper) capturedEvent).getClientUpdateMessage());
+ Assert.assertEquals("Expected HAContainer to be empty", 0, haContainerWrapper.size());
}
@Test
@@ -235,11 +297,221 @@ public class HARegionQueueIntegrationTest {
verifyHAContainerWrapper(haContainerWrapper, cd, NUM_QUEUES);
}
+ @Test
+ public void verifySimultaneousPutHAEventWrapperWithRegion() throws Exception {
+ HAContainerWrapper haContainerWrapper = createHAContainerRegion();
+
+ final int numQueues = 30;
+ final int numOperations = 1000;
+
+ Set<HAEventWrapper> haEventWrappersToValidate =
+ createAndPutHARegionQueuesSimulataneously(haContainerWrapper, numQueues, numOperations);
+
+ assertEquals(numOperations, haContainerWrapper.size());
+
+ for (HAEventWrapper haEventWrapperToValidate : haEventWrappersToValidate) {
+ HAEventWrapper wrapperInContainer =
+ (HAEventWrapper) haContainerWrapper.getKey(haEventWrapperToValidate);
+ assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+ }
+ }
+
+ @Test
+ public void verifySequentialPutHAEventWrapperWithRegion() throws Exception {
+ HAContainerWrapper haContainerWrapper = createHAContainerRegion();
+
+ ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+ (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+ new ClientProxyMembershipID(), new EventID(new byte[] {1}, 1, 2));
+ HAEventWrapper haEventWrapper = new HAEventWrapper(message);
+ haEventWrapper.setHAContainer(haContainerWrapper);
+
+ final int numQueues = 10;
+
+ createAndPutHARegionQueuesSequentially(haContainerWrapper, haEventWrapper, numQueues);
+
+ assertEquals(1, haContainerWrapper.size());
+
+ HAEventWrapper wrapperInContainer =
+ (HAEventWrapper) haContainerWrapper.getKey(haEventWrapper);
+ assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+ }
+
+ @Test
+ public void verifySimultaneousPutHAEventWrapperWithMap() throws Exception {
+ HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+ when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+ final int numQueues = 30;
+ final int numOperations = 1000;
+
+ Set<HAEventWrapper> haEventWrappersToValidate =
+ createAndPutHARegionQueuesSimulataneously(haContainerWrapper, numQueues, numOperations);
+
+ assertEquals(numOperations, haContainerWrapper.size());
+
+ for (HAEventWrapper haEventWrapperToValidate : haEventWrappersToValidate) {
+ HAEventWrapper wrapperInContainer =
+ (HAEventWrapper) haContainerWrapper.getKey(haEventWrapperToValidate);
+ assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+ }
+ }
+
+ @Test
+ public void verifySequentialPutHAEventWrapperWithMap() throws Exception {
+ HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+ when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+ ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+ (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+ new ClientProxyMembershipID(), new EventID(new byte[] {1}, 1, 2));
+ HAEventWrapper haEventWrapper = new HAEventWrapper(message);
+ haEventWrapper.setHAContainer(haContainerWrapper);
+
+ final int numQueues = 10;
+ createAndPutHARegionQueuesSequentially(haContainerWrapper, haEventWrapper, numQueues);
+
+ assertEquals(1, haContainerWrapper.size());
+
+ HAEventWrapper wrapperInContainer =
+ (HAEventWrapper) haContainerWrapper.getKey(haEventWrapper);
+ assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+ }
+
+ @Test
+ public void queueRemovalAndDispatchingConcurrently() throws Exception {
+ // Create a HAContainerMap to be used by the CacheClientNotifier
+ HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+ when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+ List<HARegionQueue> regionQueues = new ArrayList<>();
+
+ for (int i = 0; i < 2; ++i) {
+ HARegion haRegion = Mockito.mock(HARegion.class);
+ when(haRegion.getGemFireCache()).thenReturn((InternalCache) cache);
+
+ ConcurrentHashMap<Object, Object> mockRegion = new ConcurrentHashMap<>();
+
+ when(haRegion.put(Mockito.any(Object.class), Mockito.any(Object.class))).then(answer -> {
+ Object existingValue = mockRegion.put(answer.getArgument(0), answer.getArgument(1));
+ return existingValue;
+ });
+
+ when(haRegion.get(Mockito.any(Object.class))).then(answer -> {
+ return mockRegion.get(answer.getArgument(0));
+ });
+
+ doAnswer(answer -> {
+ mockRegion.remove(answer.getArgument(0));
+ return null;
+ }).when(haRegion).localDestroy(Mockito.any(Object.class));
+
+ regionQueues.add(createHARegionQueue(haContainerWrapper, i, haRegion, false));
+ }
+
+ ExecutorService service = Executors.newFixedThreadPool(2);
+
+ List<Callable<Object>> callables = new ArrayList<>();
+
+ for (int i = 0; i < 10000; ++i) {
+ callables.clear();
+
+ EventID eventID = new EventID(new byte[] {1}, 1, i);
+
+ ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+ (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+ new ClientProxyMembershipID(), eventID);
+
+ HAEventWrapper wrapper = new HAEventWrapper(message);
+ wrapper.setHAContainer(haContainerWrapper);
+ wrapper.incrementPutInProgressCounter();
+
+ for (HARegionQueue queue : regionQueues) {
+ queue.put(wrapper);
+ }
+
+ wrapper.decrementPutInProgressCounter();
+
+ for (HARegionQueue queue : regionQueues) {
+ callables.add(Executors.callable(() -> {
+ try {
+ queue.peek();
+ queue.remove();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }));
+
+ callables.add(Executors.callable(() -> {
+ try {
+ queue.removeDispatchedEvents(eventID);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }));
+ }
+
+ // invokeAll() will wait until our two callables have completed
+ List<Future<Object>> futures = service.invokeAll(callables, 10, TimeUnit.SECONDS);
+
+ for (Future<Object> future : futures) {
+ try {
+ future.get();
+ } catch (Exception ex) {
+ throw new TestException(
+ "Exception thrown while executing regionQueue methods concurrently on iteration: "
+ + i,
+ ex);
+ }
+ }
+ }
+ }
+
+ @Test
+ public void verifyPutEntryConditionallyInHAContainerNoOverwrite() throws Exception {
+ HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+
+ // create message and HAEventWrapper
+ EventID eventID = new EventID(cache.getDistributedSystem());
+ ClientUpdateMessage oldMessage = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+ (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+ new ClientProxyMembershipID(), eventID);
+ HAEventWrapper originalWrapperInstance = new HAEventWrapper(oldMessage);
+ originalWrapperInstance.incrementPutInProgressCounter();
+ originalWrapperInstance.setHAContainer(haContainerWrapper);
+
+ HARegionQueue haRegionQueue = createHARegionQueue(haContainerWrapper, 0);
+
+ haRegionQueue.putEventInHARegion(originalWrapperInstance, 1L);
+
+ // Simulate a QRM for this event
+ haRegionQueue.region.destroy(1L);
+ haRegionQueue.decAndRemoveFromHAContainer(originalWrapperInstance);
+
+ ClientUpdateMessage newMessage = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+ (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+ new ClientProxyMembershipID(), eventID);
+ HAEventWrapper newWrapperInstance = new HAEventWrapper(newMessage);
+ newWrapperInstance.incrementPutInProgressCounter();
+ newWrapperInstance.setHAContainer(haContainerWrapper);
+
+ haRegionQueue.putEventInHARegion(newWrapperInstance, 1L);
+
+ // Add the original wrapper back in, and verify that it does not overwrite the new one
+ // and that it increments the ref count on the container key.
+ haRegionQueue.putEventInHARegion(originalWrapperInstance, 1L);
+
+ Assert.assertEquals("Original message overwrote new message in container",
+ haContainerWrapper.get(originalWrapperInstance),
+ newWrapperInstance.getClientUpdateMessage());
+ Assert.assertEquals("Reference count was not the expected value", 2,
+ newWrapperInstance.getReferenceCount());
+ Assert.assertEquals("Container size was not the expected value", haContainerWrapper.size(), 1);
+ }
+
private HAContainerRegion createHAContainerRegion() throws Exception {
- // Create a Region to be used by the HAContainerRegion
Region haContainerRegionRegion = createHAContainerRegionRegion();
- // Create an HAContainerRegion
HAContainerRegion haContainerRegion = new HAContainerRegion(haContainerRegionRegion);
return haContainerRegion;
@@ -261,25 +533,27 @@ public class HARegionQueueIntegrationTest {
return region;
}
- private HARegionQueue createHARegionQueue(Map haContainer, int index) throws Exception {
+ private HARegionQueue createHARegionQueue(Map haContainer, int index, HARegion haRegion,
+ boolean puttingGIIDataInQueue) throws Exception {
StoppableReentrantReadWriteLock giiLock = Mockito.mock(StoppableReentrantReadWriteLock.class);
- when(giiLock.writeLock())
- .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableWriteLock.class));
- when(giiLock.readLock())
- .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
+ doReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableWriteLock.class)).when(giiLock)
+ .writeLock();
+ doReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableReadLock.class)).when(giiLock)
+ .readLock();
- StoppableReentrantReadWriteLock rwLock = Mockito.mock(StoppableReentrantReadWriteLock.class);
- when(rwLock.writeLock())
- .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableWriteLock.class));
- when(rwLock.readLock())
- .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
+ StoppableReentrantReadWriteLock rwLock =
+ new StoppableReentrantReadWriteLock(cache.getCancelCriterion());
+ return new HARegionQueue("haRegion+" + index, haRegion, (InternalCache) cache, haContainer,
+ null, (byte) 1, true, mock(HARegionQueueStats.class), giiLock, rwLock,
+ mock(CancelCriterion.class), puttingGIIDataInQueue);
+ }
+
+ private HARegionQueue createHARegionQueue(Map haContainer, int index) throws Exception {
HARegion haRegion = Mockito.mock(HARegion.class);
when(haRegion.getGemFireCache()).thenReturn((InternalCache) cache);
- return new HARegionQueue("haRegion+" + index, haRegion, (InternalCache) cache, haContainer,
- null, (byte) 1, true, mock(HARegionQueueStats.class), giiLock, rwLock,
- mock(CancelCriterion.class), false);
+ return createHARegionQueue(haContainer, index, haRegion, false);
}
private CachedDeserializable createCachedDeserializable(HAContainerWrapper haContainerWrapper)
@@ -319,19 +593,97 @@ public class HARegionQueueIntegrationTest {
// create HARegionQueues and startGiiQueuing on a region about half way through
for (int i = 0; i < numQueues; i++) {
- HARegionQueue haRegionQueue = createHARegionQueue(haContainerWrapper, i);
+ HARegionQueue haRegionQueue = null;
// start GII Queueing (targetRegionQueue)
if (i == startGiiQueueingIndex) {
- targetQueue = haRegionQueue;
- targetQueue.startGiiQueueing();
+ HARegion haRegion = Mockito.mock(HARegion.class);
+
+ final HARegionQueue giiHaRegionQueue =
+ createHARegionQueue(haContainerWrapper, i, haRegion, false);;
+ giiHaRegionQueue.startGiiQueueing();
+ targetQueue = giiHaRegionQueue;
+
+ when(haRegion.put(Mockito.any(Object.class), Mockito.any(HAEventWrapper.class)))
+ .then(answer -> {
+ // Simulate that either a QRM or message dispatch has occurred immediately after the
+ // put.
+ // We want to ensure that the event is removed from the HAContainer if it is drained
+ // from the giiQueue
+ // and the ref count has dropped to 0.
+ HAEventWrapper haContainerKey = answer.getArgument(1);
+ giiHaRegionQueue.decAndRemoveFromHAContainer(haContainerKey);
+ return null;
+ });
+
+ when(haRegion.getGemFireCache()).thenReturn((InternalCache) cache);
+
+ haRegionQueue = giiHaRegionQueue;
+ } else {
+ haRegionQueue = createHARegionQueue(haContainerWrapper, i);
}
haRegionQueue.put(wrapper);
}
+
return targetQueue;
}
+ private Set<HAEventWrapper> createAndPutHARegionQueuesSimulataneously(
+ HAContainerWrapper haContainerWrapper, int numQueues, int numOperations) throws Exception {
+ ConcurrentLinkedQueue<HARegionQueue> queues = new ConcurrentLinkedQueue<>();
+ final ConcurrentHashSet<HAEventWrapper> testValidationWrapperSet = new ConcurrentHashSet<>();
+ final AtomicInteger count = new AtomicInteger();
+
+ // create HARegionQueuesv
+ for (int i = 0; i < numQueues; i++) {
+ queues.add(createHARegionQueue(haContainerWrapper, i));
+ }
+
+ for (int i = 0; i < numOperations; i++) {
+ count.set(i);
+
+ ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_CREATE,
+ (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+ new ClientProxyMembershipID(), new EventID(new byte[] {1}, 1, count.get()));
+
+ queues.parallelStream().forEach(haRegionQueue -> {
+ try {
+ // In production (CacheClientNotifier.singletonRouteClientMessage), each queue has its
+ // own HAEventWrapper object even though they hold the same ClientUpdateMessage,
+ // so we create an object for each queue in here
+ HAEventWrapper haEventWrapper = new HAEventWrapper(message);
+
+ testValidationWrapperSet.add(haEventWrapper);
+
+ haRegionQueue.put(haEventWrapper);
+ } catch (InterruptedException iex) {
+ throw new RuntimeException(iex);
+ }
+ });
+ }
+
+ return testValidationWrapperSet;
+ }
+
+ private void createAndPutHARegionQueuesSequentially(HAContainerWrapper haContainerWrapper,
+ HAEventWrapper haEventWrapper, int numQueues) throws Exception {
+ ArrayList<HARegionQueue> queues = new ArrayList<>();
+
+ // create HARegionQueues
+ for (int i = 0; i < numQueues; i++) {
+ queues.add(createHARegionQueue(haContainerWrapper, i));
+ }
+
+ haEventWrapper.incrementPutInProgressCounter();
+
+ for (HARegionQueue queue : queues) {
+ queue.put(haEventWrapper);
+ }
+
+ haEventWrapper.decrementPutInProgressCounter();
+ }
+
private void createAndUpdateHARegionQueuesSimultaneously(HAContainerWrapper haContainerWrapper,
CachedDeserializable cd, int numQueues) throws Exception {
// Create some HARegionQueues
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
index 350a91e..e1baf6c 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
@@ -25,22 +25,34 @@ import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.awaitility.Awaitility;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -48,6 +60,9 @@ import org.junit.contrib.java.lang.system.RestoreSystemProperties;
import org.junit.experimental.categories.Category;
import org.junit.rules.ErrorCollector;
import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheFactory;
@@ -60,6 +75,10 @@ import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
+import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
import org.apache.geode.test.dunit.ThreadUtils;
import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
@@ -1366,6 +1385,396 @@ public class HARegionQueueJUnitTest {
HARegionQueue.getMessageSyncInterval(), is(updatedMessageSyncInterval)));
}
+ @Test
+ public void testPutEventInHARegion_Conflatable() throws Exception {
+ HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName());
+
+ Conflatable expectedConflatable =
+ new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true,
+ testName.getMethodName());
+
+ final long position = 0L;
+ Conflatable returnedConflatable = regionQueue.putEventInHARegion(expectedConflatable, position);
+
+ Assert.assertEquals(expectedConflatable, returnedConflatable);
+
+ Conflatable conflatableInRegion = (Conflatable) regionQueue.getRegion().get(position);
+
+ Assert.assertEquals(expectedConflatable, conflatableInRegion);
+ }
+
+ @Test
+ public void testPutEventInHARegion_HAEventWrapper_New() throws Exception {
+ HARegionQueue regionQueue =
+ createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE);
+ HARegionQueue regionQueueSpy = Mockito.spy(regionQueue);
+
+ HAEventWrapper newHAEventWrapper = new HAEventWrapper(mock(EventID.class));
+
+ doReturn(newHAEventWrapper).when(regionQueueSpy)
+ .putEntryConditionallyIntoHAContainer(newHAEventWrapper);
+
+ final long position = 0L;
+ Conflatable returnedHAEventWrapper =
+ regionQueueSpy.putEventInHARegion(newHAEventWrapper, position);
+
+ Assert.assertEquals(newHAEventWrapper, returnedHAEventWrapper);
+
+ HAEventWrapper haEventWrapperInRegion =
+ (HAEventWrapper) regionQueueSpy.getRegion().get(position);
+
+ Assert.assertEquals(newHAEventWrapper, haEventWrapperInRegion);
+ verify(regionQueueSpy, times(1)).putEntryConditionallyIntoHAContainer(newHAEventWrapper);
+ }
+
+ @Test
+ public void testPutEventInHARegion_HAEventWrapper_EntryAlreadyExisted() throws Exception {
+ HARegionQueue regionQueue =
+ createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE);
+ HARegionQueue regionQueueSpy = Mockito.spy(regionQueue);
+
+ // Mock out an existing entry and increment ref count as if it had already been added to the HA
+ // container
+ HAEventWrapper existingHAEventWrapper = new HAEventWrapper(mock(EventID.class));
+ HAEventWrapper newHAEventWrapper = new HAEventWrapper(mock(EventID.class));
+
+ doReturn(existingHAEventWrapper).when(regionQueueSpy)
+ .putEntryConditionallyIntoHAContainer(newHAEventWrapper);
+
+ final long position = 0L;
+ Conflatable returnedHAEventWrapper =
+ regionQueueSpy.putEventInHARegion(newHAEventWrapper, position);
+
+ Assert.assertEquals(existingHAEventWrapper, returnedHAEventWrapper);
+
+ HAEventWrapper haEventWrapperInRegion =
+ (HAEventWrapper) regionQueueSpy.getRegion().get(position);
+
+ Assert.assertEquals(existingHAEventWrapper, haEventWrapperInRegion);
+ verify(regionQueueSpy, times(1)).putEntryConditionallyIntoHAContainer(newHAEventWrapper);
+ }
+
+ @Test
+ public void testPutEventInHARegion_HAEventWrapper_QueueNotInitialized() throws Exception {
+ HARegionQueue regionQueue =
+ createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE);
+
+ // Mock that the regionQueue is not yet initialized
+ regionQueue.initialized.set(false);
+
+ HARegionQueue regionQueueSpy = Mockito.spy(regionQueue);
+ // Mock out an existing entry and increment ref count as if it had already been added to the HA
+ // container
+ HAEventWrapper expectedHAEventWrapper = new HAEventWrapper(mock(EventID.class));
+
+ final long position = 0L;
+ Conflatable returnedHAEventWrapper =
+ regionQueueSpy.putEventInHARegion(expectedHAEventWrapper, position);
+
+ Assert.assertEquals(expectedHAEventWrapper, returnedHAEventWrapper);
+
+ HAEventWrapper haEventWrapperInRegion =
+ (HAEventWrapper) regionQueueSpy.getRegion().get(position);
+
+ Assert.assertEquals(expectedHAEventWrapper, haEventWrapperInRegion);
+
+ // putEntryConditionallyIntoHAContainer should not be called if the queue isn't yet initialized
+ verify(regionQueueSpy, times(0)).putEntryConditionallyIntoHAContainer(expectedHAEventWrapper);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPutEventInHARegion_HAEventWrapper_NullClientUpdateMessage() throws Exception {
+ HARegionQueue regionQueue =
+ createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE);
+
+ HAEventWrapper haEventWrapperWithNullCUMI = new HAEventWrapper(mock(EventID.class));
+ haEventWrapperWithNullCUMI.setClientUpdateMessage(null);
+
+ final long position = 0L;
+ regionQueue.putEventInHARegion(haEventWrapperWithNullCUMI, 0L);
+ }
+
+ @Test
+ public void testPutEntryConditionallyIntoHAContainer_MultipleThreads_SameWrapperInstanceAndCorrectRefCount()
+ throws Exception {
+ HARegionQueue regionQueue =
+ createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE);
+
+ HAEventWrapper mockHaEventWrapper = mock(HAEventWrapper.class);
+ doReturn(true).when(mockHaEventWrapper).getPutInProgress();
+
+ ClientUpdateMessageImpl mockClientUpdateMessage = mock(ClientUpdateMessageImpl.class);
+ doReturn(mockClientUpdateMessage).when(mockHaEventWrapper).getClientUpdateMessage();
+
+ int numClients = 100;
+ ExecutorService executorService = Executors.newFixedThreadPool(numClients);
+
+ Collection<Callable<Conflatable>> concurrentPuts =
+ Collections.nCopies(numClients, (Callable<Conflatable>) () -> regionQueue
+ .putEntryConditionallyIntoHAContainer(mockHaEventWrapper));
+
+ List<Future<Conflatable>> futures = executorService.invokeAll(concurrentPuts);
+
+ List<Conflatable> conflatables = new ArrayList<>();
+
+ for (Future future : futures) {
+ conflatables.add((Conflatable) future.get());
+ }
+
+ boolean areAllConflatablesEqual = conflatables.stream().allMatch(conflatables.get(0)::equals);
+
+ Assert.assertTrue(areAllConflatablesEqual);
+ verify(mockHaEventWrapper, times(numClients)).incAndGetReferenceCount();
+ Assert.assertEquals(regionQueue.haContainer.get(mockHaEventWrapper), mockClientUpdateMessage);
+ }
+
+ @Test
+ public void testPutEntryConditionallyIntoHAContainer_AddCQAndInterestList() throws Exception {
+ final String haRegionName = testName.getMethodName();
+
+ HARegionQueue regionQueue =
+ createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+ ClientProxyMembershipID mockClientProxyMembershipId = mock(ClientProxyMembershipID.class);
+ CacheClientProxy mockCacheClientProxy = mock(CacheClientProxy.class);
+
+ doReturn(mockClientProxyMembershipId).when(mockCacheClientProxy).getProxyID();
+ ((HAContainerWrapper) regionQueue.haContainer).putProxy(haRegionName, mockCacheClientProxy);
+
+ ClientUpdateMessageImpl.ClientCqConcurrentMap mockClientCqConcurrentMap =
+ mock(ClientUpdateMessageImpl.ClientCqConcurrentMap.class);
+ ClientUpdateMessageImpl.CqNameToOp mockCqNameToOp =
+ mock(ClientUpdateMessageImpl.CqNameToOp.class);
+
+ doReturn(mockCqNameToOp).when(mockClientCqConcurrentMap).get(mockClientProxyMembershipId);
+
+ ClientUpdateMessageImpl mockClientUpdateMessage = mock(ClientUpdateMessageImpl.class);
+ doReturn(true).when(mockClientUpdateMessage)
+ .isClientInterestedInUpdates(mockClientProxyMembershipId);
+
+ HAEventWrapper mockHAEventWrapper = mock(HAEventWrapper.class);
+ doReturn(mockClientUpdateMessage).when(mockHAEventWrapper).getClientUpdateMessage();
+ doReturn(mockClientCqConcurrentMap).when(mockHAEventWrapper).getClientCqs();
+
+ // Mock that a put is in progress so we don't null out the
+ // ClientUpdateMessage member on the HAEventWrapper
+ mockHAEventWrapper.incrementPutInProgressCounter();
+
+ // TODO: Why don't we add CQs and Interest when we initially add the
+ // wrapper to the container?
+ regionQueue.putEntryConditionallyIntoHAContainer(mockHAEventWrapper);
+ regionQueue.putEntryConditionallyIntoHAContainer(mockHAEventWrapper);
+
+ verify(mockClientUpdateMessage, times(1)).addClientCqs(mockClientProxyMembershipId,
+ mockCqNameToOp);
+ verify(mockClientUpdateMessage, times(1)).addClientInterestList(mockClientProxyMembershipId,
+ true);
+
+ // Mock that the ClientUpdateMessage is only interested in invalidates, then do another put
+ doReturn(false).when(mockClientUpdateMessage)
+ .isClientInterestedInUpdates(mockClientProxyMembershipId);
+ doReturn(true).when(mockClientUpdateMessage)
+ .isClientInterestedInInvalidates(mockClientProxyMembershipId);
+
+ regionQueue.putEntryConditionallyIntoHAContainer(mockHAEventWrapper);
+
+ verify(mockClientUpdateMessage, times(1)).addClientInterestList(mockClientProxyMembershipId,
+ false);
+ }
+
+ @Test
+ public void testDecAndRemoveFromHAContainer_WrapperInContainer() throws Exception {
+ final String haRegionName = testName.getMethodName();
+
+ HARegionQueue regionQueue =
+ createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+ HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class);
+ HAEventWrapper mockHAEventWrapper = mock(HAEventWrapper.class);
+
+ doReturn(mockHAEventWrapper).when(mockHAContainer).getKey(mockHAEventWrapper);
+
+ regionQueue.haContainer = mockHAContainer;
+
+ regionQueue.decAndRemoveFromHAContainer(mockHAEventWrapper);
+
+ verify(mockHAContainer, times(1)).remove(mockHAEventWrapper);
+ }
+
+ @Test
+ public void testDecAndRemoveFromHAContainer_RemoteWrapperNotInContainer_Removed()
+ throws Exception {
+ final String haRegionName = testName.getMethodName();
+
+ HARegionQueue regionQueue =
+ createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+ HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class);
+ HAEventWrapper mockHAEventWrapperInContainer = mock(HAEventWrapper.class);
+ HAEventWrapper mockRemoteHAEventWrapper = mock(HAEventWrapper.class);
+
+ doReturn(mockHAEventWrapperInContainer).when(mockHAContainer).getKey(mockRemoteHAEventWrapper);
+
+ regionQueue.haContainer = mockHAContainer;
+
+ regionQueue.decAndRemoveFromHAContainer(mockRemoteHAEventWrapper);
+
+ verify(mockHAContainer, times(1)).remove(mockHAEventWrapperInContainer);
+ }
+
+ @Test
+ public void testDecAndRemoveFromHAContainer_DecrementedButNotRemoved() throws Exception {
+ final String haRegionName = testName.getMethodName();
+
+ HARegionQueue regionQueue =
+ createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+ HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class);
+ HAEventWrapper mockHAEventWrapper = mock(HAEventWrapper.class);
+
+ doReturn(mockHAEventWrapper).when(mockHAContainer).getKey(mockHAEventWrapper);
+ doReturn(1L).when(mockHAEventWrapper).decAndGetReferenceCount();
+
+ regionQueue.haContainer = mockHAContainer;
+
+ regionQueue.decAndRemoveFromHAContainer(mockHAEventWrapper);
+
+ verify(mockHAEventWrapper, times(1)).decAndGetReferenceCount();
+ verify(mockHAContainer, times(0)).remove(mockHAEventWrapper);
+ }
+
+ @Test
+ public void testDecAndRemoveFromHAContainer_AlreadyRemoved() throws Exception {
+ final String haRegionName = testName.getMethodName();
+
+ HARegionQueue regionQueue =
+ createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+ HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class);
+ HAEventWrapper mockHAEventWrapperInContainer = mock(HAEventWrapper.class);
+ HAEventWrapper mockRemoteHAEventWrapper = mock(HAEventWrapper.class);
+
+ doReturn(null).when(mockHAContainer).getKey(mockRemoteHAEventWrapper);
+
+ regionQueue.haContainer = mockHAContainer;
+
+ regionQueue.decAndRemoveFromHAContainer(mockRemoteHAEventWrapper);
+
+ verify(mockHAContainer, times(0)).remove(mockHAEventWrapperInContainer);
+ verify(mockHAEventWrapperInContainer, times(0)).decAndGetReferenceCount();
+ }
+
+ @Test
+ public void testDecAndRemoveFromHAContainer_RefChangedAfterGettingKey() throws Exception {
+ final String haRegionName = testName.getMethodName();
+
+ HARegionQueue regionQueue =
+ createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+ HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class);
+ HAEventWrapper mockOriginalHAEventWrapperInContainer = mock(HAEventWrapper.class);
+ HAEventWrapper mockNewHAEventWrapperInContainer = mock(HAEventWrapper.class);
+ HAEventWrapper mockRemoteHAEventWrapper = mock(HAEventWrapper.class);
+
+ // First call will return original wrapper in container, then second call will return a new one
+ // to simulate the key being replaced by a new one in a different thread
+ doReturn(mockOriginalHAEventWrapperInContainer)
+ .doReturn(mockNewHAEventWrapperInContainer)
+ .when(mockHAContainer).getKey(mockRemoteHAEventWrapper);
+
+ regionQueue.haContainer = mockHAContainer;
+
+ regionQueue.decAndRemoveFromHAContainer(mockRemoteHAEventWrapper);
+
+ verify(mockHAContainer, times(0)).remove(mockOriginalHAEventWrapperInContainer);
+ verify(mockOriginalHAEventWrapperInContainer, times(0)).decAndGetReferenceCount();
+ verify(mockHAContainer, times(1)).remove(mockNewHAEventWrapperInContainer);
+ verify(mockNewHAEventWrapperInContainer, times(1)).decAndGetReferenceCount();
+ }
+
+ @Test
+ public void testDecAndRemoveFromHAContainer_MultipleThreadsDecrementing() throws Exception {
+ HARegionQueue regionQueue =
+ createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE);
+
+ HAEventWrapper mockHAEventWrapper = mock(HAEventWrapper.class);
+
+ HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class);
+ doReturn(mockHAEventWrapper).when(mockHAContainer).getKey(mockHAEventWrapper);
+ regionQueue.haContainer = mockHAContainer;
+
+ final int numClients = 100;
+
+ doAnswer(new Answer() {
+ private long mockRefCount = numClients;
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ return --mockRefCount;
+ }
+ }).when(mockHAEventWrapper).decAndGetReferenceCount();
+
+ ExecutorService executorService = Executors.newFixedThreadPool(numClients);
+
+ Collection<Callable<Void>> concurrentDecAndRemoves =
+ Collections.nCopies(numClients, (Callable<Void>) () -> {
+ regionQueue
+ .decAndRemoveFromHAContainer(mockHAEventWrapper);
+ return null;
+ });
+
+ List<Future<Void>> futures = executorService.invokeAll(concurrentDecAndRemoves);
+
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+
+ verify(mockHAEventWrapper, times(numClients)).decAndGetReferenceCount();
+ verify(mockHAContainer, times(1)).remove(mockHAEventWrapper);
+ }
+
+ @Test
+ public void testPutEntryConditionallyIntoHAContainerUpdatesInterestList() throws Exception {
+ final String haRegionName = testName.getMethodName();
+
+ HARegionQueue regionQueue =
+ createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+ ClientProxyMembershipID mockClientProxyMembershipId = mock(ClientProxyMembershipID.class);
+ CacheClientProxy mockCacheClientProxy = mock(CacheClientProxy.class);
+
+ doReturn(mockClientProxyMembershipId).when(mockCacheClientProxy).getProxyID();
+ ((HAContainerWrapper) regionQueue.haContainer).putProxy(haRegionName, mockCacheClientProxy);
+
+ EventID mockEventID = mock(EventID.class);
+ ClientUpdateMessageImpl mockClientUpdateMessage = mock(ClientUpdateMessageImpl.class);
+ mockClientUpdateMessage.setEventIdentifier(mockEventID);
+
+ doReturn(true).when(mockClientUpdateMessage)
+ .isClientInterestedInUpdates(mockClientProxyMembershipId);
+
+ HAEventWrapper originalHAEventWrapper = new HAEventWrapper(mockEventID);
+ originalHAEventWrapper.setClientUpdateMessage(mockClientUpdateMessage);
+
+ // allow putInProgress to be false (so we null out the msg field in the wrapper)
+ regionQueue.putEntryConditionallyIntoHAContainer(originalHAEventWrapper);
+
+ // the initial haContainer.putIfAbsent() doesn't need to invoke addClientInterestList
+ // as it is already part of the original message
+ verify(mockClientUpdateMessage, times(0)).addClientInterestList(mockClientProxyMembershipId,
+ true);
+
+ // create a new wrapper with the same id and message
+ HAEventWrapper newHAEventWrapper = new HAEventWrapper(mockEventID);
+ newHAEventWrapper.setClientUpdateMessage(mockClientUpdateMessage);
+
+ regionQueue.putEntryConditionallyIntoHAContainer(newHAEventWrapper);
+
+ // Verify that the original haContainerEntry gets the updated clientInterestList
+ verify(mockClientUpdateMessage, times(1)).addClientInterestList(mockClientProxyMembershipId,
+ true);
+ }
+
/**
* Wait until a given runnable stops throwing exceptions. It should take at least
* minimumElapsedTime after the supplied start time to happen.
@@ -1447,7 +1856,15 @@ public class HARegionQueueJUnitTest {
}
/**
- * Creates region-queue object
+ * Creates HA region-queue object with specified queue type
+ */
+ private HARegionQueue createHARegionQueue(String name, int queueType)
+ throws IOException, ClassNotFoundException, CacheException, InterruptedException {
+ return HARegionQueue.getHARegionQueueInstance(name, cache, queueType, false);
+ }
+
+ /**
+ * Creates region-queue object with specified HARegionQueueAttributes
*/
HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs)
throws IOException, ClassNotFoundException, CacheException, InterruptedException {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierIntegrationTest.java
new file mode 100644
index 0000000..07073d5
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierIntegrationTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.shiro.subject.Subject;
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.Statistics;
+import org.apache.geode.StatisticsType;
+import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DSClock;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.SystemTimer;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.FilterProfile;
+import org.apache.geode.internal.cache.FilterRoutingInfo;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegionArguments;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category({ClientSubscriptionTest.class})
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({CacheClientProxy.class, DataSerializer.class})
+public class CacheClientNotifierIntegrationTest {
+ @Test
+ public void testCacheClientNotifier_NotifyClients_QRMCausesPrematureRemovalFromHAContainer()
+ throws Exception {
+ final CountDownLatch messageDispatcherInitLatch = new CountDownLatch(1);
+ final CountDownLatch notifyClientLatch = new CountDownLatch(1);
+
+ setupMockMessageDispatcher(messageDispatcherInitLatch, notifyClientLatch);
+
+ InternalCache mockInternalCache = createMockInternalCache();
+
+ CacheClientNotifier cacheClientNotifier =
+ CacheClientNotifier.getInstance(mockInternalCache, mock(CacheServerStats.class),
+ 100000, 100000, mock(ConnectionListener.class), null, false);
+
+ final String mockRegionNameProxyOne = "mockHARegionProxyOne";
+ final String mockRegionNameProxyTwo = "mockHARegionProxyTwo";
+
+ CacheClientProxy cacheClientProxyOne =
+ createMockCacheClientProxy(cacheClientNotifier, mockRegionNameProxyOne);
+ CacheClientProxy cacheClientProxyTwo =
+ createMockCacheClientProxy(cacheClientNotifier, mockRegionNameProxyTwo);
+
+ createMockHARegion(mockInternalCache, cacheClientProxyOne, mockRegionNameProxyOne, true);
+ createMockHARegion(mockInternalCache, cacheClientProxyTwo, mockRegionNameProxyTwo, false);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+ Collection<Callable<Void>> initAndNotifyTasks = new ArrayList<>();
+
+ // On one thread, we are initializing the message dispatchers for the two CacheClientProxy
+ // objects. For the second client's initialization,
+ // we block until we can simulate a QRM message while the event sits in the CacheClientProxy's
+ // queuedEvents collection.
+ // This blocking logic can be found in setupMockMessageDispatcher().
+ ((ArrayList<Callable<Void>>) initAndNotifyTasks).add((Callable<Void>) () -> {
+ cacheClientProxyOne.initializeMessageDispatcher();
+ cacheClientProxyTwo.initializeMessageDispatcher();
+ return null;
+ });
+
+ EntryEventImpl mockEntryEventImpl = createMockEntryEvent(new ArrayList<CacheClientProxy>() {
+ {
+ add(cacheClientProxyOne);
+ add(cacheClientProxyTwo);
+ }
+ });
+
+ // On a second thread, we are notifying clients of our mocked events. For the first client, we
+ // wait for the message dispatcher to be
+ // initialized so the event is put/processed. For the second client, we ensure that the
+ // dispatcher is still initializing so we put the event
+ // in the CacheClientProxy's queuedEvents collection. When the event is handled by the first
+ // CacheClientProxy, we mocked a QRM in the HARegion
+ // put to simulate a QRM message being received at that time. The simulated QRM logic can be
+ // found in the createMockHARegion() method.
+ ((ArrayList<Callable<Void>>) initAndNotifyTasks).add((Callable<Void>) () -> {
+ messageDispatcherInitLatch.await();
+ CacheClientNotifier.notifyClients(mockEntryEventImpl);
+ notifyClientLatch.countDown();
+ return null;
+ });
+
+ List<Future<Void>> futures = executorService.invokeAll(initAndNotifyTasks);
+
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+
+ // Verify that we do not hang in peek() for the second proxy due to the wrapper
+ Awaitility.waitAtMost(new Duration(30, TimeUnit.SECONDS)).until(() -> {
+ try {
+ Object eventPeeked = null;
+ while (eventPeeked == null) {
+ // Simulating message dispatching. We peek() and remove() but aren't testing
+ // the actual message delivery for this test.
+ eventPeeked = cacheClientProxyTwo.getHARegionQueue().peek();
+ if (eventPeeked != null) {
+ cacheClientProxyTwo.getHARegionQueue().remove();
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ throw (new RuntimeException(e));
+ }
+ });
+
+ Assert.assertEquals("Expected the HAContainer to be empty", 0,
+ cacheClientNotifier.getHaContainer().size());
+ }
+
+ private HARegion createMockHARegion(InternalCache internalCache,
+ CacheClientProxy cacheClientProxy,
+ String haRegionName,
+ boolean simulateQrm)
+ throws IOException, ClassNotFoundException {
+ HARegion mockHARegion = mock(HARegion.class);
+
+ doReturn(internalCache).when(mockHARegion).getCache();
+ doReturn(internalCache).when(mockHARegion).getGemFireCache();
+ doReturn(mock(CancelCriterion.class)).when(mockHARegion).getCancelCriterion();
+ doReturn(mock(AttributesMutator.class)).when(mockHARegion).getAttributesMutator();
+ doReturn(mockHARegion).when(internalCache).createVMRegion(eq(haRegionName),
+ any(RegionAttributes.class), any(InternalRegionArguments.class));
+
+ // We use a mock of the HARegion.put() method to simulate an queue removal message
+ // immediately after the event was successfully put. In production when a queue removal takes
+ // place at this time,
+ // it will decrement the ref count on the HAEventWrapper and potentially make it eligible for
+ // removal later on
+ // when CacheClientNotifier.checkAndRemoveFromClientMsgsRegion() is called.
+
+ Map<Object, Object> events = new HashMap<Object, Object>();
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ long position = invocation.getArgument(0);
+ HAEventWrapper haEventWrapper = invocation.getArgument(1);
+
+ if (simulateQrm) {
+ // This call is ultimately what a QRM message will do when it is processed, so we simulate
+ // that here.
+ cacheClientProxy.getHARegionQueue().destroyFromAvailableIDs(position);
+ events.remove(position);
+ cacheClientProxy.getHARegionQueue()
+ .decAndRemoveFromHAContainer((HAEventWrapper) haEventWrapper);
+ } else {
+ events.put(position, haEventWrapper);
+ }
+
+ return null;
+ }
+ }).when(mockHARegion).put(any(long.class), any(HAEventWrapper.class));
+
+ // This is so that when peek() is called, the object is returned. Later we want to verify that
+ // it was successfully "delivered" to the client and subsequently removed from the HARegion.
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ return events.get(invocation.getArgument(0));
+ }
+ }).when(mockHARegion).get(any(long.class));
+
+ return mockHARegion;
+ }
+
+ private InternalCache createMockInternalCache() {
+ InternalCache mockInternalCache = mock(InternalCache.class);
+ doReturn(mock(SystemTimer.class)).when(mockInternalCache).getCCPTimer();
+ doReturn(mock(CancelCriterion.class)).when(mockInternalCache).getCancelCriterion();
+
+ InternalDistributedSystem mockInteralDistributedSystem = createMockInternalDistributedSystem();
+ doReturn(mockInteralDistributedSystem).when(mockInternalCache).getInternalDistributedSystem();
+ doReturn(mockInteralDistributedSystem).when(mockInternalCache).getDistributedSystem();
+
+ return mockInternalCache;
+ }
+
+ private void setupMockMessageDispatcher(CountDownLatch messageDispatcherInitLatch,
+ CountDownLatch notifyClientLatch) throws Exception {
+ PowerMockito.whenNew(CacheClientProxy.MessageDispatcher.class).withAnyArguments()
+ .thenAnswer(new Answer<CacheClientProxy.MessageDispatcher>() {
+ private boolean secondClient = false;
+
+ @Override
+ public CacheClientProxy.MessageDispatcher answer(
+ org.mockito.invocation.InvocationOnMock invocation) throws Throwable {
+ if (secondClient) {
+ messageDispatcherInitLatch.countDown();
+ notifyClientLatch.await();
+ }
+
+ secondClient = true;
+
+ CacheClientProxy cacheClientProxy = invocation.getArgument(0);
+ String name = invocation.getArgument(1);
+
+ CacheClientProxy.MessageDispatcher messageDispatcher =
+ new CacheClientProxy.MessageDispatcher(cacheClientProxy,
+ name);
+
+ return messageDispatcher;
+ }
+ });
+
+ PowerMockito.mockStatic(InternalDataSerializer.class);
+ }
+
+ private InternalDistributedSystem createMockInternalDistributedSystem() {
+ InternalDistributedSystem mockInternalDistributedSystem = mock(InternalDistributedSystem.class);
+ DistributionManager mockDistributionManager = mock(DistributionManager.class);
+
+ doReturn(mock(InternalDistributedMember.class)).when(mockInternalDistributedSystem)
+ .getDistributedMember();
+ doReturn(mock(Statistics.class)).when(mockInternalDistributedSystem)
+ .createAtomicStatistics(any(StatisticsType.class), any(String.class));
+ doReturn(mock(DistributionConfig.class)).when(mockDistributionManager).getConfig();
+ doReturn(mockDistributionManager).when(mockInternalDistributedSystem).getDistributionManager();
+ doReturn(mock(DSClock.class)).when(mockInternalDistributedSystem).getClock();
+
+ return mockInternalDistributedSystem;
+ }
+
+ private CacheClientProxy createMockCacheClientProxy(CacheClientNotifier cacheClientNotifier,
+ String haRegionName)
+ throws IOException {
+ Socket mockSocket = mock(Socket.class);
+ doReturn(mock(InetAddress.class)).when(mockSocket).getInetAddress();
+
+ ClientProxyMembershipID mockClientProxyMembershipID = mock(ClientProxyMembershipID.class);
+ doReturn(mock(DistributedMember.class)).when(mockClientProxyMembershipID)
+ .getDistributedMember();
+ doReturn(haRegionName).when(mockClientProxyMembershipID).getHARegionName();
+
+ CacheClientProxy cacheClientProxy =
+ new CacheClientProxy(cacheClientNotifier, mockSocket, mockClientProxyMembershipID, true,
+ (byte) 0, Version.GFE_58, 0, true, mock(SecurityService.class), mock(Subject.class));
+
+ cacheClientNotifier.addClientInitProxy(cacheClientProxy);
+
+ return cacheClientProxy;
+ }
+
+ private EntryEventImpl createMockEntryEvent(List<CacheClientProxy> proxies) {
+ FilterRoutingInfo.FilterInfo mockFilterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+ Set mockInterestedClients = mock(Set.class);
+ doReturn(mockInterestedClients).when(mockFilterInfo).getInterestedClients();
+ doReturn(null).when(mockFilterInfo).getInterestedClientsInv();
+
+ EntryEventImpl mockEntryEventImpl = mock(EntryEventImpl.class);
+ doReturn(mockFilterInfo).when(mockEntryEventImpl).getLocalFilterInfo();
+
+ FilterProfile mockFilterProfile = mock(FilterProfile.class);
+ Set mockRealClientIDs = new HashSet<ClientProxyMembershipID>();
+
+ for (CacheClientProxy proxy : proxies) {
+ mockRealClientIDs.add(proxy.getProxyID());
+ }
+
+ doReturn(mockRealClientIDs).when(mockFilterProfile).getRealClientIDs(any(Collection.class));
+
+ LocalRegion mockLocalRegion = mock(LocalRegion.class);
+ doReturn(mockFilterProfile).when(mockLocalRegion).getFilterProfile();
+
+ doReturn(mockLocalRegion).when(mockEntryEventImpl).getRegion();
+ doReturn(EnumListenerEvent.AFTER_CREATE).when(mockEntryEventImpl).getEventType();
+ doReturn(Operation.CREATE).when(mockEntryEventImpl).getOperation();
+ EventID mockEventId = mock(EventID.class);
+ doReturn(new byte[] {1}).when(mockEventId).getMembershipID();
+ doReturn(mockEventId).when(mockEntryEventImpl).getEventId();
+
+ return mockEntryEventImpl;
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
index cafc43f..ad1baab 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
@@ -192,7 +192,7 @@ public class HARegion extends DistributedRegion {
// <HA overflow>
if (conflatable instanceof HAEventWrapper) {
- this.owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) conflatable);
+ this.owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) conflatable, "Invalidate");
}
// </HA overflow>
// update the stats
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HAContainerMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HAContainerMap.java
index 99b40ea..9a8ed7d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HAContainerMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HAContainerMap.java
@@ -53,8 +53,6 @@ public class HAContainerMap implements HAContainerWrapper {
}
public Object putProxy(String haName, CacheClientProxy proxy) {
- // InternalDistributedSystem.getLoggerI18n().info(LocalizedStrings.DEBUG, "adding proxy for " +
- // haName + ": " + proxy, new Exception("stack trace"));
return haRegionNameToProxy.put(haName, proxy);
}
@@ -63,8 +61,6 @@ public class HAContainerMap implements HAContainerWrapper {
}
public Object removeProxy(String haName) {
- // InternalDistributedSystem.getLoggerI18n().info(LocalizedStrings.DEBUG, "removing proxy for "
- // + haName, new Exception("stack trace"));
return haRegionNameToProxy.remove(haName);
}
@@ -91,12 +87,10 @@ public class HAContainerMap implements HAContainerWrapper {
}
public boolean containsValue(Object value) {
- // return map.containsValue(value);
throw new UnsupportedOperationException("containsValue() not supported.");
}
public Set entrySet() {
- // return map.entrySet();
throw new UnsupportedOperationException("entrySet() not supported.");
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 6928b8d..8c241b2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -609,10 +609,20 @@ public class HARegionQueue implements RegionQueue {
logger.debug("{}: adding message to GII queue of size {}: {}", this.regionName,
giiQueue.size(), object);
}
+ HAEventWrapper haContainerKey = null;
+
if (object instanceof HAEventWrapper) {
- // bug #43609 - prevent loss of the message while in the queue
- putEntryConditionallyIntoHAContainer((HAEventWrapper) object);
+ HAEventWrapper wrapper = (HAEventWrapper) object;
+ wrapper.incrementPutInProgressCounter();
+ if (logger.isDebugEnabled()) {
+ logger
+ .debug("Incremented PutInProgressCounter during GII queueing. Event ID hash code: "
+ + wrapper.hashCode() + "; System ID hash code: "
+ + System.identityHashCode(wrapper)
+ + "; Wrapper details: " + wrapper);
+ }
}
+
this.giiQueue.add(object);
} else {
if (logger.isTraceEnabled()) {
@@ -764,24 +774,21 @@ public class HARegionQueue implements RegionQueue {
}
if (value instanceof HAEventWrapper) {
if (((HAEventWrapper) value).getClientUpdateMessage() == null) {
- // if there is no wrapped message look for it in the HA container map
- ClientUpdateMessageImpl haContainerMessage =
- (ClientUpdateMessageImpl) haContainer.get(value);
- if (haContainerMessage != null) {
- ((HAEventWrapper) value).setClientUpdateMessage(haContainerMessage);
- } else {
- if (isDebugEnabled) {
- logger.debug(
- "{} ATTENTION: found gii queued event with null event message. Please see bug #44852: {}",
- this.regionName, value);
- }
- continue;
+ if (isDebugEnabled) {
+ logger.debug(
+ "{} ATTENTION: found gii queued event with null event message. Please see bug #44852: {}",
+ this.regionName, value);
}
+ continue;
}
}
+
basicPut(value);
+
+ // The HAEventWrapper putInProgressCounter must be decremented because it was
+ // incremented when it was queued in giiQueue.
if (value instanceof HAEventWrapper) {
- decAndRemoveFromHAContainer((HAEventWrapper) value);
+ ((HAEventWrapper) value).decrementPutInProgressCounter();
}
} catch (NoSuchElementException ignore) {
break;
@@ -1038,6 +1045,10 @@ public class HARegionQueue implements RegionQueue {
void publish(Long position) throws InterruptedException {
acquireWriteLock();
try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding position " + position + " to available IDs. Region: " + regionName);
+ }
+
this.idsAvailable.add(position);
// Notify the waiting peek threads or take threads of blocking queue
// A void operation for the non blocking queue operations
@@ -1054,11 +1065,16 @@ public class HARegionQueue implements RegionQueue {
/**
* @param position Long value present in the Available IDs map against which Event object is
* present in HARegion. This function is directly invoked from the basicInvalidate function
- * where expiry is aborted if this function returns false
+ * where
+ * expiry is aborted if this function returns false
* @return boolean true if the position could be removed from the Set
* @throws InterruptedException *
*/
public boolean destroyFromAvailableIDs(Long position) throws InterruptedException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Removing position " + position + " from available IDs. Region: " + regionName);
+ }
+
boolean removedOK = false;
acquireWriteLock();
try {
@@ -1086,6 +1102,11 @@ public class HARegionQueue implements RegionQueue {
* specified was removed from the Set
*/
protected boolean destroyFromAvailableIDsAndRegion(Long position) throws InterruptedException {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Removing position " + position + " from available IDs and region. Region: "
+ + this.regionName);
+ }
+
boolean removedOK = this.destroyFromAvailableIDs(position);
if (removedOK) {
@@ -1359,10 +1380,12 @@ public class HARegionQueue implements RegionQueue {
@Override
public Object peek() throws InterruptedException {
- if (Thread.interrupted())
+ if (Thread.interrupted()) {
throw new InterruptedException();
+ }
Conflatable object = null;
Long next = null;
+
while (true) {
try {
next = (Long) this.getNextAvailableIDFromList();
@@ -1372,13 +1395,18 @@ public class HARegionQueue implements RegionQueue {
} catch (TimeoutException ignore) {
throw new InterruptedException();
}
+
object = (Conflatable) this.region.get(next);
+
+ // It is possible for the object to be null if a queue removal
+ // occurred between getting the next available ID and getting the object
+ // from the region. If this happens, on the next iteration of this loop we will
+ // get a different available ID to process
if (object != null) {
- // peeked a object, so add the correponding counter to thread-context
object = (object instanceof HAEventWrapper) ? (Conflatable) this.haContainer.get(object)
: object;
- if (object != null) { // Is it possible for object to be null...when?
+ if (object != null) {
List peekedEvents;
if ((peekedEvents = (List) HARegionQueue.peekedEventsContext.get()) != null) {
peekedEvents.add(next);
@@ -2111,13 +2139,18 @@ public class HARegionQueue implements RegionQueue {
continue;
}
synchronized (entryHaEventWrapper) {
- if ((HAEventWrapper) haContainer.getKey(entryHaEventWrapper) != null) {
+ if (entryHaEventWrapper == (HAEventWrapper) haContainer.getKey(entryHaEventWrapper)) {
entryHaEventWrapper.incAndGetReferenceCount();
addClientCQsAndInterestList(entryMessage, inputHaEventWrapper, haContainer,
regionName);
inputHaEventWrapper.setClientUpdateMessage(null);
newValueCd =
new VMCachedDeserializable(entryHaEventWrapper, newValueCd.getSizeInBytes());
+ if (logger.isDebugEnabled()) {
+ logger.debug("GII Update of Event ID hash code: " + entryHaEventWrapper.hashCode()
+ + "; System ID hash code: " + System.identityHashCode(entryHaEventWrapper)
+ + "; Wrapper details: " + entryHaEventWrapper);
+ }
} else {
entryHaEventWrapper = null;
}
@@ -2127,7 +2160,11 @@ public class HARegionQueue implements RegionQueue {
inputHaEventWrapper.incAndGetReferenceCount();
inputHaEventWrapper.setHAContainer(haContainer);
inputHaEventWrapper.setClientUpdateMessage(null);
- inputHaEventWrapper.setIsRefFromHAContainer(true);
+ if (logger.isDebugEnabled()) {
+ logger.debug("GII Add of Event ID hash code: " + inputHaEventWrapper.hashCode()
+ + "; System ID hash code: " + System.identityHashCode(inputHaEventWrapper)
+ + "; Wrapper details: " + entryHaEventWrapper);
+ }
}
break;
}
@@ -2972,6 +3009,8 @@ public class HARegionQueue implements RegionQueue {
}
boolean rejected = false;
+ Conflatable eventInHARegion = null;
+
synchronized (this) {
if (sequenceID > this.lastSequenceIDPut) {
if (logger.isTraceEnabled()) {
@@ -2999,11 +3038,12 @@ public class HARegionQueue implements RegionQueue {
if (lastDispatchedSequenceId == TOKEN_DESTROYED) {
return false;
}
+
if (sequenceID > lastDispatchedSequenceId || owningQueue.puttingGIIDataInQueue) {
// Insert the object into the Region
Long position = owningQueue.tailKey.incrementAndGet();
- owningQueue.putEventInHARegion(event, position);
+ eventInHARegion = owningQueue.putEventInHARegion(event, position);
// Add the position counter to the LinkedHashSet
if (this.counters == null) {
@@ -3012,12 +3052,12 @@ public class HARegionQueue implements RegionQueue {
this.counters.put(position, null);
// Check if the event is conflatable
- if (owningQueue.shouldBeConflated(event)) {
+ if (owningQueue.shouldBeConflated(eventInHARegion)) {
// Add to the conflation map & get the position of the
// old conflatable entry. The old entry may have inserted by the
// same
// ThreadIdentifier or different one.
- oldPosition = owningQueue.addToConflationMap(event, position);
+ oldPosition = owningQueue.addToConflationMap(eventInHARegion, position);
}
// Take the size lock & add to the list of availabelIds
@@ -3037,7 +3077,7 @@ public class HARegionQueue implements RegionQueue {
ccn.getClientProxy(owningQueue.clientProxyID).getStatistics().incMessagesFailedQueued();
}
} else {
- owningQueue.entryEnqueued(event);
+ owningQueue.entryEnqueued(eventInHARegion);
}
// Remove the old conflated position
if (oldPosition != null) {
@@ -3073,7 +3113,8 @@ public class HARegionQueue implements RegionQueue {
}
// <HA overflow>
if (conflatable instanceof HAEventWrapper) {
- owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) conflatable);
+ owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) conflatable,
+ "Remove Old Conflated Entry");
}
// </HA overflow>
// update statistics
@@ -3227,7 +3268,9 @@ public class HARegionQueue implements RegionQueue {
if (((HAEventWrapper) event).getReferenceCount() == 0 && logger.isDebugEnabled()) {
logger.debug("Reference count is already zero for event {}", event.getEventId());
}
- owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) event);
+
+ owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) event,
+ "Queue Removal Message");
}
// At this point we know we're going to remove the event,
@@ -3287,7 +3330,7 @@ public class HARegionQueue implements RegionQueue {
}
// <HA overflow>
if (wrapper instanceof HAEventWrapper) {
- owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) wrapper);
+ owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) wrapper, "Message Dispatcher");
}
// </HA overflow>
owningQueue.stats.incEventsRemoved();
@@ -3428,61 +3471,24 @@ public class HARegionQueue implements RegionQueue {
*
* @since GemFire 5.7
*/
- protected void putEventInHARegion(Conflatable event, Long position) {
+ protected Conflatable putEventInHARegion(Conflatable event, Long position) {
if (event instanceof HAEventWrapper) {
HAEventWrapper inputHaEventWrapper = (HAEventWrapper) event;
+ HAEventWrapper haContainerKey = null;
+
if (this.isQueueInitialized()) {
- if (inputHaEventWrapper.getIsRefFromHAContainer()) {
- putEntryConditionallyIntoHAContainer(inputHaEventWrapper);
- } else {
- // This means that the haEventWrapper reference we have is not
- // authentic, i.e. it doesn't refer to the HAEventWrapper instance
- // in the haContainer, but to the one outside it.
- HAEventWrapper haContainerKey = null;
- do {
- ClientUpdateMessageImpl haContainerEntry =
- (ClientUpdateMessageImpl) ((HAContainerWrapper) this.haContainer)
- .putIfAbsent(inputHaEventWrapper, inputHaEventWrapper.getClientUpdateMessage());
- if (haContainerEntry != null) {
- haContainerKey = (HAEventWrapper) ((HAContainerWrapper) this.haContainer)
- .getKey(inputHaEventWrapper);
- if (haContainerKey == null) {
- continue;
- }
- synchronized (haContainerKey) {
- // assert the entry is still present
- if (((HAContainerWrapper) this.haContainer).getKey(haContainerKey) != null) {
- haContainerKey.incAndGetReferenceCount();
- addClientCQsAndInterestList(haContainerEntry, inputHaEventWrapper,
- this.haContainer, this.regionName);
- inputHaEventWrapper = haContainerKey;
- } else {
- haContainerKey = null;
- }
- }
- } else {
- synchronized (inputHaEventWrapper) {
- inputHaEventWrapper.incAndGetReferenceCount();
- inputHaEventWrapper.setHAContainer(this.haContainer);
- if (!inputHaEventWrapper.getPutInProgress()) {
- // This means that this is a GII'ed event. Hence we must
- // explicitly set 'clientUpdateMessage' to null.
- inputHaEventWrapper.setClientUpdateMessage(null);
- }
- inputHaEventWrapper.setIsRefFromHAContainer(true);
- }
- break;
- }
- } while (haContainerKey == null);
- }
+ haContainerKey = putEntryConditionallyIntoHAContainer(inputHaEventWrapper);
+ } else {
+ haContainerKey = inputHaEventWrapper;
}
- // Put the reference to the HAEventWrapper instance into the
- // HA queue.
+
if (logger.isDebugEnabled()) {
- logger.debug("adding inputHaEventWrapper to HARegion at " + position + ":"
- + inputHaEventWrapper + " for " + this.regionName);
+ logger.debug("adding haContainerKey to HARegion at " + position + ":"
+ + haContainerKey + " for " + this.regionName);
}
- this.region.put(position, inputHaEventWrapper);
+ this.region.put(position, haContainerKey);
+
+ return haContainerKey;
} else { // (event instanceof ClientMarkerMessageImpl OR ConflatableObject OR
// ClientInstantiatorMessage)
if (logger.isDebugEnabled()) {
@@ -3490,6 +3496,8 @@ public class HARegionQueue implements RegionQueue {
+ this.regionName);
}
this.region.put(position, event);
+
+ return event;
}
}
@@ -3503,18 +3511,17 @@ public class HARegionQueue implements RegionQueue {
msg.addClientCqs(proxyID, clientCQ);
}
}
- // if (haEventWrapper.getPutInProgress()) {
- // ((HAEventWrapper)entry.getKey()).setPutInProgress(true);
- // }
// This is a remote HAEventWrapper.
// Add new Interested client lists.
ClientUpdateMessageImpl clientMsg =
(ClientUpdateMessageImpl) haEventWrapper.getClientUpdateMessage();
- if (clientMsg.isClientInterestedInUpdates(proxyID)) {
- msg.addClientInterestList(proxyID, true);
- } else if (clientMsg.isClientInterestedInInvalidates(proxyID)) {
- msg.addClientInterestList(proxyID, false);
+ if (clientMsg != null) {
+ if (clientMsg.isClientInterestedInUpdates(proxyID)) {
+ msg.addClientInterestList(proxyID, true);
+ } else if (clientMsg.isClientInterestedInInvalidates(proxyID)) {
+ msg.addClientInterestList(proxyID, false);
+ }
}
}
@@ -3522,17 +3529,69 @@ public class HARegionQueue implements RegionQueue {
* If the wrapper's referenceCount becomes 1 after increment, then set this haEventWrapper and its
* clientUpdateMessage into the haContainer as <key, value>.
*
- * @param haEventWrapper An instance of {@code HAEventWrapper}
+ * @param inputHaEventWrapper An instance of {@code HAEventWrapper}
* @since GemFire 5.7
*/
- protected void putEntryConditionallyIntoHAContainer(HAEventWrapper haEventWrapper) {
- if (haEventWrapper.incAndGetReferenceCount() == 1) {
- // if (logger.isDebugEnabled()) {
- // logger.fine("Putting event in haContainer: " + haEventWrapper);
- // }
- haEventWrapper.setHAContainer(HARegionQueue.this.haContainer);
- this.haContainer.put(haEventWrapper, haEventWrapper.getClientUpdateMessage());
+ protected HAEventWrapper putEntryConditionallyIntoHAContainer(
+ HAEventWrapper inputHaEventWrapper) {
+ HAEventWrapper haContainerKey = null;
+
+ while (haContainerKey == null) {
+ ClientUpdateMessageImpl haContainerEntry =
+ (ClientUpdateMessageImpl) ((HAContainerWrapper) this.haContainer)
+ .putIfAbsent(inputHaEventWrapper, inputHaEventWrapper.getClientUpdateMessage());
+
+ if (haContainerEntry != null) {
+ haContainerKey = (HAEventWrapper) ((HAContainerWrapper) this.haContainer)
+ .getKey(inputHaEventWrapper);
+
+ // Key was already removed from the container, so continue
+ if (haContainerKey == null) {
+ continue;
+ }
+
+ synchronized (haContainerKey) {
+ // assert the entry is still present and we still have the same reference
+ if (haContainerKey == ((HAContainerWrapper) this.haContainer).getKey(haContainerKey)) {
+ haContainerKey.incAndGetReferenceCount();
+
+ addClientCQsAndInterestList(haContainerEntry, inputHaEventWrapper,
+ this.haContainer, this.regionName);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Putting updated event in haContainer with Event ID hash code: "
+ + haContainerKey.hashCode() + "; System ID hash code: "
+ + System.identityHashCode(haContainerKey)
+ + "; Wrapper details: " + haContainerKey);
+ }
+ } else {
+ haContainerKey = null;
+ }
+ }
+ } else {
+ synchronized (inputHaEventWrapper) {
+ inputHaEventWrapper.incAndGetReferenceCount();
+ inputHaEventWrapper.setHAContainer(this.haContainer);
+
+ if (!inputHaEventWrapper.getPutInProgress()) {
+ // This means that this is a GII'ed event. Hence we must
+ // explicitly set 'clientUpdateMessage' to null.
+ inputHaEventWrapper.setClientUpdateMessage(null);
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Putting new event in haContainer with Event ID hash code: "
+ + inputHaEventWrapper.hashCode()
+ + "; System ID hash code: " + System.identityHashCode(inputHaEventWrapper)
+ + "; Wrapper details: " + inputHaEventWrapper);
+ }
+ }
+
+ haContainerKey = inputHaEventWrapper;
+ }
}
+
+ return haContainerKey;
}
/**
@@ -3601,6 +3660,7 @@ public class HARegionQueue implements RegionQueue {
for (int i = 0; i < wrapperArray.length; i++) {
wrapperSet.add(this.region.get(wrapperArray[i]));
}
+
// Start a new thread which will update the clientMessagesRegion for
// each of the HAEventWrapper instances present in the wrapperSet
Thread regionCleanupTask = new Thread(new Runnable() {
@@ -3611,7 +3671,8 @@ public class HARegionQueue implements RegionQueue {
while (iter.hasNext()) {
Conflatable conflatable = (Conflatable) iter.next();
if (conflatable instanceof HAEventWrapper) {
- HARegionQueue.this.decAndRemoveFromHAContainer((HAEventWrapper) conflatable);
+ HARegionQueue.this
+ .decAndRemoveFromHAContainer((HAEventWrapper) conflatable, "Destroy");
}
}
} catch (CancelException ignore) {
@@ -3624,7 +3685,7 @@ public class HARegionQueue implements RegionQueue {
}
}
}
- });
+ }, "HA Region Cleanup for " + regionName);
regionCleanupTask.start();
}
} catch (CancelException e) {
@@ -3662,7 +3723,7 @@ public class HARegionQueue implements RegionQueue {
HAEventWrapper wrapper = (HAEventWrapper) conflatable;
msg = (Conflatable) HARegionQueue.this.haContainer.get(wrapper);
if (msg != null) {
- decAndRemoveFromHAContainer(wrapper);
+ decAndRemoveFromHAContainer(wrapper, "GetAndRemoveFromHAContainer");
}
} else {
msg = conflatable;
@@ -3671,23 +3732,44 @@ public class HARegionQueue implements RegionQueue {
}
/**
- * IMPORTANT: <br>
- * The wrapper passed here must be the authentic wrapper, i.e. it must be the one referred by the
- * HARegion underlying this queue. <br>
- * Decrements wrapper's reference count by one. If the decremented ref count is zero and put is
- * not in progress, removes the entry from the haContainer.
+ * Decrements reference count for the wrapper in the container by one. If the decremented ref
+ * count is zero and put is not in progress, removes the entry from the haContainer.
*
* @since GemFire 5.7
*/
public void decAndRemoveFromHAContainer(HAEventWrapper wrapper) {
- if (wrapper.decAndGetReferenceCount() == 0L && !wrapper.getPutInProgress()) {
- synchronized (wrapper) {
- if (wrapper.getReferenceCount() == 0L) {
+ decAndRemoveFromHAContainer(wrapper, "");
+ }
+
+ public void decAndRemoveFromHAContainer(HAEventWrapper wrapper, String caller) {
+ boolean decAndRemovePerformed = false;
+
+ while (!decAndRemovePerformed) {
+ HAEventWrapper haContainerKey =
+ (HAEventWrapper) ((HAContainerWrapper) haContainer).getKey(wrapper);
+
+ if (haContainerKey == null) {
+ break;
+ }
+
+ synchronized (haContainerKey) {
+ if (haContainerKey == (HAEventWrapper) ((HAContainerWrapper) haContainer).getKey(wrapper)) {
if (logger.isDebugEnabled()) {
- logger.debug("Removing event from {}: {}", this.region.getFullPath(),
- wrapper.getEventId());
+ logger.debug(caller + " decremented Event ID hash code: " + haContainerKey.hashCode()
+ + "; System ID hash code: " + System.identityHashCode(haContainerKey)
+ + "; Wrapper details: " + haContainerKey);
+ }
+ if (haContainerKey.decAndGetReferenceCount() == 0L) {
+ HARegionQueue.this.haContainer.remove(haContainerKey);
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ caller + " removed Event ID hash code: " + haContainerKey.hashCode()
+ + "; System ID hash code: "
+ + System.identityHashCode(haContainerKey)
+ + "; Wrapper details: " + haContainerKey);
+ }
}
- HARegionQueue.this.haContainer.remove(wrapper);
+ decAndRemovePerformed = true;
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index 43fa076..743cac8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -892,9 +892,15 @@ public class CacheClientNotifier {
}
} else {
HAEventWrapper wrapper = new HAEventWrapper(clientMessage);
- // Set the putInProgress flag to true before starting the put on proxy's
- // HA queues. Nowhere else, this flag is being set to true.
- wrapper.setPutInProgress(true);
+ wrapper.incrementPutInProgressCounter();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Initial increment PutInProgressCounter on HAEventWrapper with Event ID hash code: "
+ + wrapper.hashCode() + "; System ID hash code: "
+ + System.identityHashCode(wrapper) + "; Wrapper details: " + wrapper);
+ }
+
conflatable = wrapper;
}
@@ -990,7 +996,11 @@ public class CacheClientNotifier {
this.blackListSlowReceiver(proxy);
}
}
- checkAndRemoveFromClientMsgsRegion(conflatable);
+
+ if (conflatable instanceof HAEventWrapper) {
+ ((HAEventWrapper) conflatable).decrementPutInProgressCounter();
+ }
+
// Remove any dead clients from the clients to notify
if (deadProxies != null) {
closeDeadProxies(deadProxies, false);
@@ -1302,47 +1312,6 @@ public class CacheClientNotifier {
}
/**
- * If the conflatable is an instance of HAEventWrapper, and if the corresponding entry is present
- * in the haContainer, set the reference to the clientUpdateMessage to null and putInProgress flag
- * to false. Also, if the ref count is zero, then remove the entry from the haContainer.
- *
- * @since GemFire 5.7
- */
- private void checkAndRemoveFromClientMsgsRegion(Conflatable conflatable) {
- if (haContainer == null) {
- return;
- }
- if (conflatable instanceof HAEventWrapper) {
- HAEventWrapper wrapper = (HAEventWrapper) conflatable;
- if (!wrapper.getIsRefFromHAContainer()) {
- wrapper = (HAEventWrapper) haContainer.getKey(wrapper);
- if (wrapper != null && !wrapper.getPutInProgress()) {
- synchronized (wrapper) {
- if (wrapper.getReferenceCount() == 0L) {
- if (logger.isDebugEnabled()) {
- logger.debug("Removing event from haContainer: {}", wrapper);
- }
- haContainer.remove(wrapper);
- }
- }
- }
- } else {
- // This wrapper resides in haContainer.
- wrapper.setClientUpdateMessage(null);
- wrapper.setPutInProgress(false);
- synchronized (wrapper) {
- if (wrapper.getReferenceCount() == 0L) {
- if (logger.isDebugEnabled()) {
- logger.debug("Removing event from haContainer: {}", wrapper);
- }
- haContainer.remove(wrapper);
- }
- }
- }
- }
- }
-
- /**
* Returns the <code>CacheClientProxy</code> associated to the membershipID *
*
* @return the <code>CacheClientProxy</code> associated to the membershipID
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 15317da..84a67e3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -1544,6 +1544,7 @@ public class CacheClientProxy implements ClientSession {
protected void deliverMessage(Conflatable conflatable) {
ThreadState state = this.securityService.bindSubject(this.subject);
ClientUpdateMessage clientMessage = null;
+
if (conflatable instanceof HAEventWrapper) {
clientMessage = ((HAEventWrapper) conflatable).getClientUpdateMessage();
} else {
@@ -1571,7 +1572,22 @@ public class CacheClientProxy implements ClientSession {
"Message dispatcher for proxy {} is getting initialized. Adding message to the queuedEvents.",
this);
}
+
+ if (conflatable instanceof HAEventWrapper) {
+ HAEventWrapper haEventWrapper = (HAEventWrapper) conflatable;
+ haEventWrapper.incrementPutInProgressCounter();
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Incremented PutInProgressCounter on HAEventWrapper with Event ID hash code: "
+ + haEventWrapper.hashCode()
+ + "; System ID hash code: "
+ + System.identityHashCode(haEventWrapper) + "; Wrapper details: "
+ + haEventWrapper);
+ }
+ }
+
this.queuedEvents.add(conflatable);
+
return;
}
}
@@ -1581,13 +1597,24 @@ public class CacheClientProxy implements ClientSession {
this._messageDispatcher.enqueueMessage(conflatable);
} else {
this._statistics.incMessagesFailedQueued();
+
if (logger.isDebugEnabled()) {
logger.debug(
- "Message is not added to the queue. Message dispatcher for proxy: {} doesn't exist.",
- this);
+ "Message was not added to the queue. Message dispatcher was null for proxy: " + this
+ + ". Event ID hash code: " + conflatable.hashCode() + "; System ID hash code: "
+ + System.identityHashCode(conflatable) + "; Conflatable details: " + conflatable
+ .toString());
}
}
} else {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Message was not added to the queue. Event ID hash code: " + conflatable.hashCode()
+ + "; System ID hash code: "
+ + System.identityHashCode(conflatable) + "; Conflatable details: " + conflatable
+ .toString());
+ }
+
this._statistics.incMessagesFailedQueued();
}
@@ -1690,17 +1717,13 @@ public class CacheClientProxy implements ClientSession {
logger.debug("{} draining {} events from init queue into intialized queue", this,
this.queuedEvents.size());
}
- Conflatable nextEvent;
- while ((nextEvent = queuedEvents.poll()) != null) {
- this._messageDispatcher.enqueueMessage(nextEvent);
- }
+
+ drainQueuedEvents(false);
// Now finish emptying the queue with synchronization to make
// sure we don't miss any events.
synchronized (this.queuedEventsSync) {
- while ((nextEvent = queuedEvents.poll()) != null) {
- this._messageDispatcher.enqueueMessage(nextEvent);
- }
+ drainQueuedEvents(true);
this.messageDispatcherInit = false; // Done initialization.
}
@@ -1711,6 +1734,29 @@ public class CacheClientProxy implements ClientSession {
}
}
+ private void drainQueuedEvents(boolean withSynchronization) {
+ Conflatable nextEvent;
+ while ((nextEvent = queuedEvents.poll()) != null) {
+ if (logger.isDebugEnabled()) {
+ if (nextEvent instanceof HAEventWrapper) {
+ logger.debug(
+ "Draining events queued during message dispatcher initialization "
+ + (withSynchronization ? "with" : "without")
+ + " synchronization. Event ID hash code: "
+ + nextEvent.hashCode()
+ + "; System ID hash code: " + System.identityHashCode(nextEvent)
+ + "; Wrapper details: " + nextEvent);
+ }
+ }
+
+ this._messageDispatcher.enqueueMessage(nextEvent);
+
+ if (nextEvent instanceof HAEventWrapper) {
+ ((HAEventWrapper) nextEvent).decrementPutInProgressCounter();
+ }
+ }
+ }
+
protected void startOrResumeMessageDispatcher(boolean processedMarker) {
// Only start or resume the dispatcher if it is Primary
if (this.isPrimary) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HAEventWrapper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HAEventWrapper.java
index 9a31992..a755efa 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HAEventWrapper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HAEventWrapper.java
@@ -87,16 +87,13 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID, Siz
AtomicLongFieldUpdater.newUpdater(HAEventWrapper.class, "referenceCount");
/**
- * If true, the entry containing this HAEventWrapper instance will not be removed from the
- * haContainer, even if the referenceCount value is zero.
+ * If greater than zero, the entry containing this HAEventWrapper instance will not be removed
+ * from the haContainer, even if the referenceCount value is zero.
*/
- private transient boolean putInProgress = false;
-
- /**
- * A value true indicates that this instance is not used in the <code>haContainer</code>. So any
- * changes in this instance will not be visible to the <code>haContainer</code>.
- */
- private transient boolean isRefFromHAContainer = false;
+ @SuppressWarnings("unused")
+ private transient volatile long putInProgressCount;
+ private static final AtomicLongFieldUpdater<HAEventWrapper> putInProgressCountUpdater =
+ AtomicLongFieldUpdater.newUpdater(HAEventWrapper.class, "putInProgressCount");
/**
* A reference to its <code>ClientUpdateMessage</code> instance.
@@ -119,6 +116,7 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID, Siz
this.shouldConflate = event.shouldBeConflated();
this.eventIdentifier = event.getEventId();
rcUpdater.set(this, 0);
+ putInProgressCountUpdater.set(this, 0);
this.clientUpdateMessage = event;
this.clientCqs = ((ClientUpdateMessageImpl) event).getClientCqs();
}
@@ -128,6 +126,7 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID, Siz
this.clientUpdateMessage = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_CREATE,
new ClientProxyMembershipID(), eventId);
rcUpdater.set(this, 0);
+ putInProgressCountUpdater.set(this, 0);
}
/**
@@ -208,22 +207,6 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID, Siz
return this.clientCqs;
}
- public void setPutInProgress(boolean inProgress) {
- this.putInProgress = inProgress;
- }
-
- public boolean getPutInProgress() {
- return this.putInProgress;
- }
-
- public void setIsRefFromHAContainer(boolean bool) {
- this.isRefFromHAContainer = bool;
- }
-
- public boolean getIsRefFromHAContainer() {
- return this.isRefFromHAContainer;
- }
-
public void setHAContainer(Map container) {
this.haContainer = container;
}
@@ -258,12 +241,13 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID, Siz
@Override
public String toString() {
if (this.clientUpdateMessage != null) {
- return "HAEventWrapper[refCount=" + getReferenceCount() + "; msg=" + this.clientUpdateMessage
- + "]";
+ return "HAEventWrapper[refCount=" + getReferenceCount() + "; putInProgress="
+ + putInProgressCountUpdater.get(this) + "; msg=" + this.clientUpdateMessage + "]";
} else {
return "HAEventWrapper[region=" + this.regionName + "; key=" + this.keyOfInterest
- + "; refCount=" + getReferenceCount() + "; inContainer=" + this.isRefFromHAContainer
- + "; putInProgress=" + this.putInProgress + "; event=" + this.eventIdentifier
+ + "; refCount=" + getReferenceCount()
+ + "; putInProgress=" + putInProgressCountUpdater.get(this) + "; event="
+ + this.eventIdentifier
+ ((this.clientUpdateMessage == null) ? "; no message" : ";with message")
+ ((this.clientUpdateMessage == null) ? ""
: ("; op=" + this.clientUpdateMessage.getOperation()))
@@ -411,4 +395,35 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID, Siz
return rcUpdater.decrementAndGet(this);
}
+ public long incrementPutInProgressCounter() {
+ return putInProgressCountUpdater.incrementAndGet(this);
+ }
+
+ public long decrementPutInProgressCounter() {
+ synchronized (this) {
+ long putInProgressCounter = putInProgressCountUpdater.decrementAndGet(this);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Decremented PutInProgressCounter on HAEventWrapper with Event ID hash code: "
+ + hashCode() + "; System ID hash code: "
+ + System.identityHashCode(this) + "; Wrapper details: " + toString());
+ }
+
+ if (putInProgressCounter == 0L) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Setting HAEventWrapper ClientUpdateMessage to null. Event ID hash code: "
+ + hashCode()
+ + "; System ID hash code: " + System.identityHashCode(this) + "; Wrapper details: "
+ + toString());
+ }
+ setClientUpdateMessage(null);
+ }
+
+ return putInProgressCounter;
+ }
+ }
+
+ public boolean getPutInProgress() {
+ return putInProgressCountUpdater.get(this) > 0;
+ }
}