You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by la...@apache.org on 2018/08/01 18:25:39 UTC

[geode] branch develop updated: GEODE-5420: Protect events in HAContainer from premature modification

This is an automated email from the ASF dual-hosted git repository.

ladyvader pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 103b467  GEODE-5420: Protect events in HAContainer from premature modification
103b467 is described below

commit 103b467ab6205f6c9d180d5ad28705ce47c7b5bb
Author: Ryan McMahon <rm...@pivotal.io>
AuthorDate: Tue Jul 3 16:26:40 2018 -0700

    GEODE-5420: Protect events in HAContainer from premature modification
    
    - Updated putInProgress boolean in HAWrapper to a counter to prevent prematurely setting ClientUpdateMessage to null when events are temporarily queued during a GII or message dispatcher initialization
    - decAndRemoveFromHAContainer only removes when putInProgress counter and ref count are 0
    - Refactored putEventInHARegion/putConditionallyInHAContainer to prevent overwriting an existing entry in the HAContainer.  Also reduces simplifies the code and reduces duplicated logic.
    - Wrote missing basic HARegionQueue unit/integration tests, and an integration test to capture setting the ClientUpdateMessage property on HAEventWrapper to null prematurely
    - Added new event tracing messages at debug logging level to help track similar issues in the future
    
    Co-authored-by: Ryan McMahon <rm...@pivotal.io>
    Co-authored-by: Lynn Hughes-Godfrey <lh...@pivotal.io>
---
 .../cache/ha/HARegionQueueIntegrationTest.java     | 430 +++++++++++++++++++--
 .../internal/cache/ha/HARegionQueueJUnitTest.java  | 419 +++++++++++++++++++-
 .../CacheClientNotifierIntegrationTest.java        | 339 ++++++++++++++++
 .../org/apache/geode/internal/cache/HARegion.java  |   2 +-
 .../geode/internal/cache/ha/HAContainerMap.java    |   6 -
 .../geode/internal/cache/ha/HARegionQueue.java     | 294 +++++++++-----
 .../cache/tier/sockets/CacheClientNotifier.java    |  59 +--
 .../cache/tier/sockets/CacheClientProxy.java       |  64 ++-
 .../cache/tier/sockets/HAEventWrapper.java         |  73 ++--
 9 files changed, 1450 insertions(+), 236 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
index 8a496d1..a1b71a1 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
@@ -18,31 +18,43 @@ import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mock;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+import util.TestException;
 
 import org.apache.geode.CancelCriterion;
+import org.apache.geode.Statistics;
+import org.apache.geode.StatisticsType;
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
@@ -52,11 +64,15 @@ import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.DSClock;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.CacheServerImpl;
 import org.apache.geode.internal.cache.CachedDeserializable;
-import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.EnumListenerEvent;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
@@ -66,10 +82,13 @@ import org.apache.geode.internal.cache.InternalRegionArguments;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.VMCachedDeserializable;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage;
 import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
+import org.apache.geode.internal.cache.tier.sockets.ConnectionListener;
 import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
+import org.apache.geode.internal.concurrent.ConcurrentHashSet;
 import org.apache.geode.internal.util.BlobHelper;
 import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
 
@@ -111,6 +130,34 @@ public class HARegionQueueIntegrationTest {
     return cache.createRegionFactory(RegionShortcut.REPLICATE).create("data");
   }
 
+  private InternalCache createMockInternalCache() {
+    InternalCache mockInternalCache = Mockito.mock(InternalCache.class);
+    doReturn(Mockito.mock(SystemTimer.class)).when(mockInternalCache).getCCPTimer();
+    doReturn(Mockito.mock(CancelCriterion.class)).when(mockInternalCache).getCancelCriterion();
+
+    InternalDistributedSystem mockInteralDistributedSystem = createMockInternalDistributedSystem();
+    doReturn(mockInteralDistributedSystem).when(mockInternalCache).getInternalDistributedSystem();
+    doReturn(mockInteralDistributedSystem).when(mockInternalCache).getDistributedSystem();
+
+    return mockInternalCache;
+  }
+
+  private InternalDistributedSystem createMockInternalDistributedSystem() {
+    InternalDistributedSystem mockInternalDistributedSystem =
+        Mockito.mock(InternalDistributedSystem.class);
+    DistributionManager mockDistributionManager = Mockito.mock(DistributionManager.class);
+
+    doReturn(Mockito.mock(InternalDistributedMember.class)).when(mockInternalDistributedSystem)
+        .getDistributedMember();
+    doReturn(Mockito.mock(Statistics.class)).when(mockInternalDistributedSystem)
+        .createAtomicStatistics(any(StatisticsType.class), any(String.class));
+    doReturn(Mockito.mock(DistributionConfig.class)).when(mockDistributionManager).getConfig();
+    doReturn(mockDistributionManager).when(mockInternalDistributedSystem).getDistributionManager();
+    doReturn(Mockito.mock(DSClock.class)).when(mockInternalDistributedSystem).getClock();
+
+    return mockInternalDistributedSystem;
+  }
+
   private CacheClientNotifier createCacheClientNotifier() {
     // Create a mock CacheClientNotifier
     CacheClientNotifier ccn = mock(CacheClientNotifier.class);
@@ -127,9 +174,18 @@ public class HARegionQueueIntegrationTest {
   }
 
   @Test
-  public void verifyEndGiiQueueingPutsHAEventWrapperNotClientUpdateMessage() throws Exception {
+  public void verifyEndGiiQueueingEmptiesQueueAndHAContainer() throws Exception {
+    // We need to use an actual cache client notifier for this test because we are making assertions
+    // based on the actual CacheClientNotifier.checkAndRemoveFromClientMsgsRegion() method.
+    InternalCache mockInternalCache = createMockInternalCache();
+    CacheClientNotifier cacheClientNotifier =
+        CacheClientNotifier.getInstance(mockInternalCache, Mockito.mock(CacheServerStats.class),
+            100000, 100000, Mockito.mock(ConnectionListener.class), null, false);
+    PowerMockito.when(CacheClientNotifier.getInstance()).thenReturn(cacheClientNotifier);
+
     // Create a HAContainerRegion
-    HAContainerWrapper haContainerWrapper = createHAContainerRegion();
+    HAContainerWrapper haContainerWrapper =
+        (HAContainerWrapper) cacheClientNotifier.getHaContainer();
 
     // create message and HAEventWrapper
     ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
@@ -137,6 +193,7 @@ public class HARegionQueueIntegrationTest {
         new ClientProxyMembershipID(), new EventID(cache.getDistributedSystem()));
     HAEventWrapper wrapper = new HAEventWrapper(message);
     wrapper.setHAContainer(haContainerWrapper);
+    wrapper.incrementPutInProgressCounter();
 
     // Create and update HARegionQueues forcing one queue to startGiiQueueing
     int numQueues = 10;
@@ -147,28 +204,33 @@ public class HARegionQueueIntegrationTest {
     assertEquals(1, haContainerWrapper.size());
 
     HAEventWrapper wrapperInContainer = (HAEventWrapper) haContainerWrapper.getKey(wrapper);
-    assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+    assertEquals(numQueues - 1, wrapperInContainer.getReferenceCount());
+    assertTrue(wrapperInContainer.getPutInProgress());
 
-    // Verify that the HAEventWrapper in the giiQueue now has msg = null
-    // this gets set to null when wrapper is added to HAContainer (for non-gii queues)
+    // Verify that the HAEventWrapper in the giiQueue now has msg != null
+    // We don't null this out while putInProgress > 0 (true)
     Queue giiQueue = targetQueue.getGiiQueue();
     assertEquals(1, giiQueue.size());
 
+    // Simulate that we have iterated through all interested proxies
+    // and are now decrementing the PutInProgressCounter
+    wrapperInContainer.decrementPutInProgressCounter();
+
+    // Simulate that other queues have processed this event, then
+    // peek and process the event off the giiQueue
+    for (int i = 0; i < numQueues - 1; ++i) {
+      targetQueue.decAndRemoveFromHAContainer(wrapper);
+    }
+
     HAEventWrapper giiQueueEntry = (HAEventWrapper) giiQueue.peek();
     assertNotNull(giiQueueEntry);
-    assertNull(giiQueueEntry.getClientUpdateMessage());
+    assertNotNull(giiQueueEntry.getClientUpdateMessage());
 
-    // endGiiQueueing and verify queue empty and putEventInHARegion invoked with HAEventWrapper
-    // not ClientUpdateMessageImpl
-    HARegionQueue spyTargetQueue = spy(targetQueue);
-    spyTargetQueue.endGiiQueueing();
+    // endGiiQueueing and verify queue and HAContainer are empty
+    targetQueue.endGiiQueueing();
     assertEquals(0, giiQueue.size());
 
-    ArgumentCaptor<Conflatable> eventCaptor = ArgumentCaptor.forClass(Conflatable.class);
-    verify(spyTargetQueue).putEventInHARegion(eventCaptor.capture(), anyLong());
-    Conflatable capturedEvent = eventCaptor.getValue();
-    assertTrue(capturedEvent instanceof HAEventWrapper);
-    assertNotNull(((HAEventWrapper) capturedEvent).getClientUpdateMessage());
+    Assert.assertEquals("Expected HAContainer to be empty", 0, haContainerWrapper.size());
   }
 
   @Test
@@ -235,11 +297,221 @@ public class HARegionQueueIntegrationTest {
     verifyHAContainerWrapper(haContainerWrapper, cd, NUM_QUEUES);
   }
 
+  @Test
+  public void verifySimultaneousPutHAEventWrapperWithRegion() throws Exception {
+    HAContainerWrapper haContainerWrapper = createHAContainerRegion();
+
+    final int numQueues = 30;
+    final int numOperations = 1000;
+
+    Set<HAEventWrapper> haEventWrappersToValidate =
+        createAndPutHARegionQueuesSimulataneously(haContainerWrapper, numQueues, numOperations);
+
+    assertEquals(numOperations, haContainerWrapper.size());
+
+    for (HAEventWrapper haEventWrapperToValidate : haEventWrappersToValidate) {
+      HAEventWrapper wrapperInContainer =
+          (HAEventWrapper) haContainerWrapper.getKey(haEventWrapperToValidate);
+      assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+    }
+  }
+
+  @Test
+  public void verifySequentialPutHAEventWrapperWithRegion() throws Exception {
+    HAContainerWrapper haContainerWrapper = createHAContainerRegion();
+
+    ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+        (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+        new ClientProxyMembershipID(), new EventID(new byte[] {1}, 1, 2));
+    HAEventWrapper haEventWrapper = new HAEventWrapper(message);
+    haEventWrapper.setHAContainer(haContainerWrapper);
+
+    final int numQueues = 10;
+
+    createAndPutHARegionQueuesSequentially(haContainerWrapper, haEventWrapper, numQueues);
+
+    assertEquals(1, haContainerWrapper.size());
+
+    HAEventWrapper wrapperInContainer =
+        (HAEventWrapper) haContainerWrapper.getKey(haEventWrapper);
+    assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+  }
+
+  @Test
+  public void verifySimultaneousPutHAEventWrapperWithMap() throws Exception {
+    HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+    when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+    final int numQueues = 30;
+    final int numOperations = 1000;
+
+    Set<HAEventWrapper> haEventWrappersToValidate =
+        createAndPutHARegionQueuesSimulataneously(haContainerWrapper, numQueues, numOperations);
+
+    assertEquals(numOperations, haContainerWrapper.size());
+
+    for (HAEventWrapper haEventWrapperToValidate : haEventWrappersToValidate) {
+      HAEventWrapper wrapperInContainer =
+          (HAEventWrapper) haContainerWrapper.getKey(haEventWrapperToValidate);
+      assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+    }
+  }
+
+  @Test
+  public void verifySequentialPutHAEventWrapperWithMap() throws Exception {
+    HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+    when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+    ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+        (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+        new ClientProxyMembershipID(), new EventID(new byte[] {1}, 1, 2));
+    HAEventWrapper haEventWrapper = new HAEventWrapper(message);
+    haEventWrapper.setHAContainer(haContainerWrapper);
+
+    final int numQueues = 10;
+    createAndPutHARegionQueuesSequentially(haContainerWrapper, haEventWrapper, numQueues);
+
+    assertEquals(1, haContainerWrapper.size());
+
+    HAEventWrapper wrapperInContainer =
+        (HAEventWrapper) haContainerWrapper.getKey(haEventWrapper);
+    assertEquals(numQueues, wrapperInContainer.getReferenceCount());
+  }
+
+  @Test
+  public void queueRemovalAndDispatchingConcurrently() throws Exception {
+    // Create a HAContainerMap to be used by the CacheClientNotifier
+    HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+    when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+    List<HARegionQueue> regionQueues = new ArrayList<>();
+
+    for (int i = 0; i < 2; ++i) {
+      HARegion haRegion = Mockito.mock(HARegion.class);
+      when(haRegion.getGemFireCache()).thenReturn((InternalCache) cache);
+
+      ConcurrentHashMap<Object, Object> mockRegion = new ConcurrentHashMap<>();
+
+      when(haRegion.put(Mockito.any(Object.class), Mockito.any(Object.class))).then(answer -> {
+        Object existingValue = mockRegion.put(answer.getArgument(0), answer.getArgument(1));
+        return existingValue;
+      });
+
+      when(haRegion.get(Mockito.any(Object.class))).then(answer -> {
+        return mockRegion.get(answer.getArgument(0));
+      });
+
+      doAnswer(answer -> {
+        mockRegion.remove(answer.getArgument(0));
+        return null;
+      }).when(haRegion).localDestroy(Mockito.any(Object.class));
+
+      regionQueues.add(createHARegionQueue(haContainerWrapper, i, haRegion, false));
+    }
+
+    ExecutorService service = Executors.newFixedThreadPool(2);
+
+    List<Callable<Object>> callables = new ArrayList<>();
+
+    for (int i = 0; i < 10000; ++i) {
+      callables.clear();
+
+      EventID eventID = new EventID(new byte[] {1}, 1, i);
+
+      ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+          (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+          new ClientProxyMembershipID(), eventID);
+
+      HAEventWrapper wrapper = new HAEventWrapper(message);
+      wrapper.setHAContainer(haContainerWrapper);
+      wrapper.incrementPutInProgressCounter();
+
+      for (HARegionQueue queue : regionQueues) {
+        queue.put(wrapper);
+      }
+
+      wrapper.decrementPutInProgressCounter();
+
+      for (HARegionQueue queue : regionQueues) {
+        callables.add(Executors.callable(() -> {
+          try {
+            queue.peek();
+            queue.remove();
+          } catch (Exception ex) {
+            throw new RuntimeException(ex);
+          }
+        }));
+
+        callables.add(Executors.callable(() -> {
+          try {
+            queue.removeDispatchedEvents(eventID);
+          } catch (Exception ex) {
+            throw new RuntimeException(ex);
+          }
+        }));
+      }
+
+      // invokeAll() will wait until our two callables have completed
+      List<Future<Object>> futures = service.invokeAll(callables, 10, TimeUnit.SECONDS);
+
+      for (Future<Object> future : futures) {
+        try {
+          future.get();
+        } catch (Exception ex) {
+          throw new TestException(
+              "Exception thrown while executing regionQueue methods concurrently on iteration: "
+                  + i,
+              ex);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void verifyPutEntryConditionallyInHAContainerNoOverwrite() throws Exception {
+    HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+
+    // create message and HAEventWrapper
+    EventID eventID = new EventID(cache.getDistributedSystem());
+    ClientUpdateMessage oldMessage = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+        (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+        new ClientProxyMembershipID(), eventID);
+    HAEventWrapper originalWrapperInstance = new HAEventWrapper(oldMessage);
+    originalWrapperInstance.incrementPutInProgressCounter();
+    originalWrapperInstance.setHAContainer(haContainerWrapper);
+
+    HARegionQueue haRegionQueue = createHARegionQueue(haContainerWrapper, 0);
+
+    haRegionQueue.putEventInHARegion(originalWrapperInstance, 1L);
+
+    // Simulate a QRM for this event
+    haRegionQueue.region.destroy(1L);
+    haRegionQueue.decAndRemoveFromHAContainer(originalWrapperInstance);
+
+    ClientUpdateMessage newMessage = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+        (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+        new ClientProxyMembershipID(), eventID);
+    HAEventWrapper newWrapperInstance = new HAEventWrapper(newMessage);
+    newWrapperInstance.incrementPutInProgressCounter();
+    newWrapperInstance.setHAContainer(haContainerWrapper);
+
+    haRegionQueue.putEventInHARegion(newWrapperInstance, 1L);
+
+    // Add the original wrapper back in, and verify that it does not overwrite the new one
+    // and that it increments the ref count on the container key.
+    haRegionQueue.putEventInHARegion(originalWrapperInstance, 1L);
+
+    Assert.assertEquals("Original message overwrote new message in container",
+        haContainerWrapper.get(originalWrapperInstance),
+        newWrapperInstance.getClientUpdateMessage());
+    Assert.assertEquals("Reference count was not the expected value", 2,
+        newWrapperInstance.getReferenceCount());
+    Assert.assertEquals("Container size was not the expected value", haContainerWrapper.size(), 1);
+  }
+
   private HAContainerRegion createHAContainerRegion() throws Exception {
-    // Create a Region to be used by the HAContainerRegion
     Region haContainerRegionRegion = createHAContainerRegionRegion();
 
-    // Create an HAContainerRegion
     HAContainerRegion haContainerRegion = new HAContainerRegion(haContainerRegionRegion);
 
     return haContainerRegion;
@@ -261,25 +533,27 @@ public class HARegionQueueIntegrationTest {
     return region;
   }
 
-  private HARegionQueue createHARegionQueue(Map haContainer, int index) throws Exception {
+  private HARegionQueue createHARegionQueue(Map haContainer, int index, HARegion haRegion,
+      boolean puttingGIIDataInQueue) throws Exception {
     StoppableReentrantReadWriteLock giiLock = Mockito.mock(StoppableReentrantReadWriteLock.class);
-    when(giiLock.writeLock())
-        .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableWriteLock.class));
-    when(giiLock.readLock())
-        .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
+    doReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableWriteLock.class)).when(giiLock)
+        .writeLock();
+    doReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableReadLock.class)).when(giiLock)
+        .readLock();
 
-    StoppableReentrantReadWriteLock rwLock = Mockito.mock(StoppableReentrantReadWriteLock.class);
-    when(rwLock.writeLock())
-        .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableWriteLock.class));
-    when(rwLock.readLock())
-        .thenReturn(Mockito.mock(StoppableReentrantReadWriteLock.StoppableReadLock.class));
+    StoppableReentrantReadWriteLock rwLock =
+        new StoppableReentrantReadWriteLock(cache.getCancelCriterion());
 
+    return new HARegionQueue("haRegion+" + index, haRegion, (InternalCache) cache, haContainer,
+        null, (byte) 1, true, mock(HARegionQueueStats.class), giiLock, rwLock,
+        mock(CancelCriterion.class), puttingGIIDataInQueue);
+  }
+
+  private HARegionQueue createHARegionQueue(Map haContainer, int index) throws Exception {
     HARegion haRegion = Mockito.mock(HARegion.class);
     when(haRegion.getGemFireCache()).thenReturn((InternalCache) cache);
 
-    return new HARegionQueue("haRegion+" + index, haRegion, (InternalCache) cache, haContainer,
-        null, (byte) 1, true, mock(HARegionQueueStats.class), giiLock, rwLock,
-        mock(CancelCriterion.class), false);
+    return createHARegionQueue(haContainer, index, haRegion, false);
   }
 
   private CachedDeserializable createCachedDeserializable(HAContainerWrapper haContainerWrapper)
@@ -319,19 +593,97 @@ public class HARegionQueueIntegrationTest {
 
     // create HARegionQueues and startGiiQueuing on a region about half way through
     for (int i = 0; i < numQueues; i++) {
-      HARegionQueue haRegionQueue = createHARegionQueue(haContainerWrapper, i);
+      HARegionQueue haRegionQueue = null;
 
       // start GII Queueing (targetRegionQueue)
       if (i == startGiiQueueingIndex) {
-        targetQueue = haRegionQueue;
-        targetQueue.startGiiQueueing();
+        HARegion haRegion = Mockito.mock(HARegion.class);
+
+        final HARegionQueue giiHaRegionQueue =
+            createHARegionQueue(haContainerWrapper, i, haRegion, false);;
+        giiHaRegionQueue.startGiiQueueing();
+        targetQueue = giiHaRegionQueue;
+
+        when(haRegion.put(Mockito.any(Object.class), Mockito.any(HAEventWrapper.class)))
+            .then(answer -> {
+              // Simulate that either a QRM or message dispatch has occurred immediately after the
+              // put.
+              // We want to ensure that the event is removed from the HAContainer if it is drained
+              // from the giiQueue
+              // and the ref count has dropped to 0.
+              HAEventWrapper haContainerKey = answer.getArgument(1);
+              giiHaRegionQueue.decAndRemoveFromHAContainer(haContainerKey);
+              return null;
+            });
+
+        when(haRegion.getGemFireCache()).thenReturn((InternalCache) cache);
+
+        haRegionQueue = giiHaRegionQueue;
+      } else {
+        haRegionQueue = createHARegionQueue(haContainerWrapper, i);
       }
 
       haRegionQueue.put(wrapper);
     }
+
     return targetQueue;
   }
 
+  private Set<HAEventWrapper> createAndPutHARegionQueuesSimulataneously(
+      HAContainerWrapper haContainerWrapper, int numQueues, int numOperations) throws Exception {
+    ConcurrentLinkedQueue<HARegionQueue> queues = new ConcurrentLinkedQueue<>();
+    final ConcurrentHashSet<HAEventWrapper> testValidationWrapperSet = new ConcurrentHashSet<>();
+    final AtomicInteger count = new AtomicInteger();
+
+    // create HARegionQueuesv
+    for (int i = 0; i < numQueues; i++) {
+      queues.add(createHARegionQueue(haContainerWrapper, i));
+    }
+
+    for (int i = 0; i < numOperations; i++) {
+      count.set(i);
+
+      ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_CREATE,
+          (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+          new ClientProxyMembershipID(), new EventID(new byte[] {1}, 1, count.get()));
+
+      queues.parallelStream().forEach(haRegionQueue -> {
+        try {
+          // In production (CacheClientNotifier.singletonRouteClientMessage), each queue has its
+          // own HAEventWrapper object even though they hold the same ClientUpdateMessage,
+          // so we create an object for each queue in here
+          HAEventWrapper haEventWrapper = new HAEventWrapper(message);
+
+          testValidationWrapperSet.add(haEventWrapper);
+
+          haRegionQueue.put(haEventWrapper);
+        } catch (InterruptedException iex) {
+          throw new RuntimeException(iex);
+        }
+      });
+    }
+
+    return testValidationWrapperSet;
+  }
+
+  private void createAndPutHARegionQueuesSequentially(HAContainerWrapper haContainerWrapper,
+      HAEventWrapper haEventWrapper, int numQueues) throws Exception {
+    ArrayList<HARegionQueue> queues = new ArrayList<>();
+
+    // create HARegionQueues
+    for (int i = 0; i < numQueues; i++) {
+      queues.add(createHARegionQueue(haContainerWrapper, i));
+    }
+
+    haEventWrapper.incrementPutInProgressCounter();
+
+    for (HARegionQueue queue : queues) {
+      queue.put(haEventWrapper);
+    }
+
+    haEventWrapper.decrementPutInProgressCounter();
+  }
+
   private void createAndUpdateHARegionQueuesSimultaneously(HAContainerWrapper haContainerWrapper,
       CachedDeserializable cd, int numQueues) throws Exception {
     // Create some HARegionQueues
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
index 350a91e..e1baf6c 100755
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java
@@ -25,22 +25,34 @@ import static org.hamcrest.CoreMatchers.sameInstance;
 import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.awaitility.Awaitility;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -48,6 +60,9 @@ import org.junit.contrib.java.lang.system.RestoreSystemProperties;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ErrorCollector;
 import org.junit.rules.TestName;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheFactory;
@@ -60,6 +75,10 @@ import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
+import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
 import org.apache.geode.test.dunit.ThreadUtils;
 import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
 
@@ -1366,6 +1385,396 @@ public class HARegionQueueJUnitTest {
             HARegionQueue.getMessageSyncInterval(), is(updatedMessageSyncInterval)));
   }
 
+  @Test
+  public void testPutEventInHARegion_Conflatable() throws Exception {
+    HARegionQueue regionQueue = createHARegionQueue(testName.getMethodName());
+
+    Conflatable expectedConflatable =
+        new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true,
+            testName.getMethodName());
+
+    final long position = 0L;
+    Conflatable returnedConflatable = regionQueue.putEventInHARegion(expectedConflatable, position);
+
+    Assert.assertEquals(expectedConflatable, returnedConflatable);
+
+    Conflatable conflatableInRegion = (Conflatable) regionQueue.getRegion().get(position);
+
+    Assert.assertEquals(expectedConflatable, conflatableInRegion);
+  }
+
+  @Test
+  public void testPutEventInHARegion_HAEventWrapper_New() throws Exception {
+    HARegionQueue regionQueue =
+        createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE);
+    HARegionQueue regionQueueSpy = Mockito.spy(regionQueue);
+
+    HAEventWrapper newHAEventWrapper = new HAEventWrapper(mock(EventID.class));
+
+    doReturn(newHAEventWrapper).when(regionQueueSpy)
+        .putEntryConditionallyIntoHAContainer(newHAEventWrapper);
+
+    final long position = 0L;
+    Conflatable returnedHAEventWrapper =
+        regionQueueSpy.putEventInHARegion(newHAEventWrapper, position);
+
+    Assert.assertEquals(newHAEventWrapper, returnedHAEventWrapper);
+
+    HAEventWrapper haEventWrapperInRegion =
+        (HAEventWrapper) regionQueueSpy.getRegion().get(position);
+
+    Assert.assertEquals(newHAEventWrapper, haEventWrapperInRegion);
+    verify(regionQueueSpy, times(1)).putEntryConditionallyIntoHAContainer(newHAEventWrapper);
+  }
+
+  @Test
+  public void testPutEventInHARegion_HAEventWrapper_EntryAlreadyExisted() throws Exception {
+    HARegionQueue regionQueue =
+        createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE);
+    HARegionQueue regionQueueSpy = Mockito.spy(regionQueue);
+
+    // Mock out an existing entry and increment ref count as if it had already been added to the HA
+    // container
+    HAEventWrapper existingHAEventWrapper = new HAEventWrapper(mock(EventID.class));
+    HAEventWrapper newHAEventWrapper = new HAEventWrapper(mock(EventID.class));
+
+    doReturn(existingHAEventWrapper).when(regionQueueSpy)
+        .putEntryConditionallyIntoHAContainer(newHAEventWrapper);
+
+    final long position = 0L;
+    Conflatable returnedHAEventWrapper =
+        regionQueueSpy.putEventInHARegion(newHAEventWrapper, position);
+
+    Assert.assertEquals(existingHAEventWrapper, returnedHAEventWrapper);
+
+    HAEventWrapper haEventWrapperInRegion =
+        (HAEventWrapper) regionQueueSpy.getRegion().get(position);
+
+    Assert.assertEquals(existingHAEventWrapper, haEventWrapperInRegion);
+    verify(regionQueueSpy, times(1)).putEntryConditionallyIntoHAContainer(newHAEventWrapper);
+  }
+
+  @Test
+  public void testPutEventInHARegion_HAEventWrapper_QueueNotInitialized() throws Exception {
+    HARegionQueue regionQueue =
+        createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE);
+
+    // Mock that the regionQueue is not yet initialized
+    regionQueue.initialized.set(false);
+
+    HARegionQueue regionQueueSpy = Mockito.spy(regionQueue);
+    // Mock out an existing entry and increment ref count as if it had already been added to the HA
+    // container
+    HAEventWrapper expectedHAEventWrapper = new HAEventWrapper(mock(EventID.class));
+
+    final long position = 0L;
+    Conflatable returnedHAEventWrapper =
+        regionQueueSpy.putEventInHARegion(expectedHAEventWrapper, position);
+
+    Assert.assertEquals(expectedHAEventWrapper, returnedHAEventWrapper);
+
+    HAEventWrapper haEventWrapperInRegion =
+        (HAEventWrapper) regionQueueSpy.getRegion().get(position);
+
+    Assert.assertEquals(expectedHAEventWrapper, haEventWrapperInRegion);
+
+    // putEntryConditionallyIntoHAContainer should not be called if the queue isn't yet initialized
+    verify(regionQueueSpy, times(0)).putEntryConditionallyIntoHAContainer(expectedHAEventWrapper);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testPutEventInHARegion_HAEventWrapper_NullClientUpdateMessage() throws Exception {
+    HARegionQueue regionQueue =
+        createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE);
+
+    HAEventWrapper haEventWrapperWithNullCUMI = new HAEventWrapper(mock(EventID.class));
+    haEventWrapperWithNullCUMI.setClientUpdateMessage(null);
+
+    final long position = 0L;
+    regionQueue.putEventInHARegion(haEventWrapperWithNullCUMI, 0L);
+  }
+
+  @Test
+  public void testPutEntryConditionallyIntoHAContainer_MultipleThreads_SameWrapperInstanceAndCorrectRefCount()
+      throws Exception {
+    HARegionQueue regionQueue =
+        createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE);
+
+    HAEventWrapper mockHaEventWrapper = mock(HAEventWrapper.class);
+    doReturn(true).when(mockHaEventWrapper).getPutInProgress();
+
+    ClientUpdateMessageImpl mockClientUpdateMessage = mock(ClientUpdateMessageImpl.class);
+    doReturn(mockClientUpdateMessage).when(mockHaEventWrapper).getClientUpdateMessage();
+
+    int numClients = 100;
+    ExecutorService executorService = Executors.newFixedThreadPool(numClients);
+
+    Collection<Callable<Conflatable>> concurrentPuts =
+        Collections.nCopies(numClients, (Callable<Conflatable>) () -> regionQueue
+            .putEntryConditionallyIntoHAContainer(mockHaEventWrapper));
+
+    List<Future<Conflatable>> futures = executorService.invokeAll(concurrentPuts);
+
+    List<Conflatable> conflatables = new ArrayList<>();
+
+    for (Future future : futures) {
+      conflatables.add((Conflatable) future.get());
+    }
+
+    boolean areAllConflatablesEqual = conflatables.stream().allMatch(conflatables.get(0)::equals);
+
+    Assert.assertTrue(areAllConflatablesEqual);
+    verify(mockHaEventWrapper, times(numClients)).incAndGetReferenceCount();
+    Assert.assertEquals(regionQueue.haContainer.get(mockHaEventWrapper), mockClientUpdateMessage);
+  }
+
+  @Test
+  public void testPutEntryConditionallyIntoHAContainer_AddCQAndInterestList() throws Exception {
+    final String haRegionName = testName.getMethodName();
+
+    HARegionQueue regionQueue =
+        createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+    ClientProxyMembershipID mockClientProxyMembershipId = mock(ClientProxyMembershipID.class);
+    CacheClientProxy mockCacheClientProxy = mock(CacheClientProxy.class);
+
+    doReturn(mockClientProxyMembershipId).when(mockCacheClientProxy).getProxyID();
+    ((HAContainerWrapper) regionQueue.haContainer).putProxy(haRegionName, mockCacheClientProxy);
+
+    ClientUpdateMessageImpl.ClientCqConcurrentMap mockClientCqConcurrentMap =
+        mock(ClientUpdateMessageImpl.ClientCqConcurrentMap.class);
+    ClientUpdateMessageImpl.CqNameToOp mockCqNameToOp =
+        mock(ClientUpdateMessageImpl.CqNameToOp.class);
+
+    doReturn(mockCqNameToOp).when(mockClientCqConcurrentMap).get(mockClientProxyMembershipId);
+
+    ClientUpdateMessageImpl mockClientUpdateMessage = mock(ClientUpdateMessageImpl.class);
+    doReturn(true).when(mockClientUpdateMessage)
+        .isClientInterestedInUpdates(mockClientProxyMembershipId);
+
+    HAEventWrapper mockHAEventWrapper = mock(HAEventWrapper.class);
+    doReturn(mockClientUpdateMessage).when(mockHAEventWrapper).getClientUpdateMessage();
+    doReturn(mockClientCqConcurrentMap).when(mockHAEventWrapper).getClientCqs();
+
+    // Mock that a put is in progress so we don't null out the
+    // ClientUpdateMessage member on the HAEventWrapper
+    mockHAEventWrapper.incrementPutInProgressCounter();
+
+    // TODO: Why don't we add CQs and Interest when we initially add the
+    // wrapper to the container?
+    regionQueue.putEntryConditionallyIntoHAContainer(mockHAEventWrapper);
+    regionQueue.putEntryConditionallyIntoHAContainer(mockHAEventWrapper);
+
+    verify(mockClientUpdateMessage, times(1)).addClientCqs(mockClientProxyMembershipId,
+        mockCqNameToOp);
+    verify(mockClientUpdateMessage, times(1)).addClientInterestList(mockClientProxyMembershipId,
+        true);
+
+    // Mock that the ClientUpdateMessage is only interested in invalidates, then do another put
+    doReturn(false).when(mockClientUpdateMessage)
+        .isClientInterestedInUpdates(mockClientProxyMembershipId);
+    doReturn(true).when(mockClientUpdateMessage)
+        .isClientInterestedInInvalidates(mockClientProxyMembershipId);
+
+    regionQueue.putEntryConditionallyIntoHAContainer(mockHAEventWrapper);
+
+    verify(mockClientUpdateMessage, times(1)).addClientInterestList(mockClientProxyMembershipId,
+        false);
+  }
+
+  @Test
+  public void testDecAndRemoveFromHAContainer_WrapperInContainer() throws Exception {
+    final String haRegionName = testName.getMethodName();
+
+    HARegionQueue regionQueue =
+        createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+    HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class);
+    HAEventWrapper mockHAEventWrapper = mock(HAEventWrapper.class);
+
+    doReturn(mockHAEventWrapper).when(mockHAContainer).getKey(mockHAEventWrapper);
+
+    regionQueue.haContainer = mockHAContainer;
+
+    regionQueue.decAndRemoveFromHAContainer(mockHAEventWrapper);
+
+    verify(mockHAContainer, times(1)).remove(mockHAEventWrapper);
+  }
+
+  @Test
+  public void testDecAndRemoveFromHAContainer_RemoteWrapperNotInContainer_Removed()
+      throws Exception {
+    final String haRegionName = testName.getMethodName();
+
+    HARegionQueue regionQueue =
+        createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+    HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class);
+    HAEventWrapper mockHAEventWrapperInContainer = mock(HAEventWrapper.class);
+    HAEventWrapper mockRemoteHAEventWrapper = mock(HAEventWrapper.class);
+
+    doReturn(mockHAEventWrapperInContainer).when(mockHAContainer).getKey(mockRemoteHAEventWrapper);
+
+    regionQueue.haContainer = mockHAContainer;
+
+    regionQueue.decAndRemoveFromHAContainer(mockRemoteHAEventWrapper);
+
+    verify(mockHAContainer, times(1)).remove(mockHAEventWrapperInContainer);
+  }
+
+  @Test
+  public void testDecAndRemoveFromHAContainer_DecrementedButNotRemoved() throws Exception {
+    final String haRegionName = testName.getMethodName();
+
+    HARegionQueue regionQueue =
+        createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+    HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class);
+    HAEventWrapper mockHAEventWrapper = mock(HAEventWrapper.class);
+
+    doReturn(mockHAEventWrapper).when(mockHAContainer).getKey(mockHAEventWrapper);
+    doReturn(1L).when(mockHAEventWrapper).decAndGetReferenceCount();
+
+    regionQueue.haContainer = mockHAContainer;
+
+    regionQueue.decAndRemoveFromHAContainer(mockHAEventWrapper);
+
+    verify(mockHAEventWrapper, times(1)).decAndGetReferenceCount();
+    verify(mockHAContainer, times(0)).remove(mockHAEventWrapper);
+  }
+
+  @Test
+  public void testDecAndRemoveFromHAContainer_AlreadyRemoved() throws Exception {
+    final String haRegionName = testName.getMethodName();
+
+    HARegionQueue regionQueue =
+        createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+    HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class);
+    HAEventWrapper mockHAEventWrapperInContainer = mock(HAEventWrapper.class);
+    HAEventWrapper mockRemoteHAEventWrapper = mock(HAEventWrapper.class);
+
+    doReturn(null).when(mockHAContainer).getKey(mockRemoteHAEventWrapper);
+
+    regionQueue.haContainer = mockHAContainer;
+
+    regionQueue.decAndRemoveFromHAContainer(mockRemoteHAEventWrapper);
+
+    verify(mockHAContainer, times(0)).remove(mockHAEventWrapperInContainer);
+    verify(mockHAEventWrapperInContainer, times(0)).decAndGetReferenceCount();
+  }
+
+  @Test
+  public void testDecAndRemoveFromHAContainer_RefChangedAfterGettingKey() throws Exception {
+    final String haRegionName = testName.getMethodName();
+
+    HARegionQueue regionQueue =
+        createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+    HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class);
+    HAEventWrapper mockOriginalHAEventWrapperInContainer = mock(HAEventWrapper.class);
+    HAEventWrapper mockNewHAEventWrapperInContainer = mock(HAEventWrapper.class);
+    HAEventWrapper mockRemoteHAEventWrapper = mock(HAEventWrapper.class);
+
+    // First call will return original wrapper in container, then second call will return a new one
+    // to simulate the key being replaced by a new one in a different thread
+    doReturn(mockOriginalHAEventWrapperInContainer)
+        .doReturn(mockNewHAEventWrapperInContainer)
+        .when(mockHAContainer).getKey(mockRemoteHAEventWrapper);
+
+    regionQueue.haContainer = mockHAContainer;
+
+    regionQueue.decAndRemoveFromHAContainer(mockRemoteHAEventWrapper);
+
+    verify(mockHAContainer, times(0)).remove(mockOriginalHAEventWrapperInContainer);
+    verify(mockOriginalHAEventWrapperInContainer, times(0)).decAndGetReferenceCount();
+    verify(mockHAContainer, times(1)).remove(mockNewHAEventWrapperInContainer);
+    verify(mockNewHAEventWrapperInContainer, times(1)).decAndGetReferenceCount();
+  }
+
+  @Test
+  public void testDecAndRemoveFromHAContainer_MultipleThreadsDecrementing() throws Exception {
+    HARegionQueue regionQueue =
+        createHARegionQueue(testName.getMethodName(), HARegionQueue.BLOCKING_HA_QUEUE);
+
+    HAEventWrapper mockHAEventWrapper = mock(HAEventWrapper.class);
+
+    HAContainerWrapper mockHAContainer = mock(HAContainerWrapper.class);
+    doReturn(mockHAEventWrapper).when(mockHAContainer).getKey(mockHAEventWrapper);
+    regionQueue.haContainer = mockHAContainer;
+
+    final int numClients = 100;
+
+    doAnswer(new Answer() {
+      private long mockRefCount = numClients;
+
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        return --mockRefCount;
+      }
+    }).when(mockHAEventWrapper).decAndGetReferenceCount();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(numClients);
+
+    Collection<Callable<Void>> concurrentDecAndRemoves =
+        Collections.nCopies(numClients, (Callable<Void>) () -> {
+          regionQueue
+              .decAndRemoveFromHAContainer(mockHAEventWrapper);
+          return null;
+        });
+
+    List<Future<Void>> futures = executorService.invokeAll(concurrentDecAndRemoves);
+
+    for (Future<Void> future : futures) {
+      future.get();
+    }
+
+    verify(mockHAEventWrapper, times(numClients)).decAndGetReferenceCount();
+    verify(mockHAContainer, times(1)).remove(mockHAEventWrapper);
+  }
+
+  @Test
+  public void testPutEntryConditionallyIntoHAContainerUpdatesInterestList() throws Exception {
+    final String haRegionName = testName.getMethodName();
+
+    HARegionQueue regionQueue =
+        createHARegionQueue(haRegionName, HARegionQueue.BLOCKING_HA_QUEUE);
+
+    ClientProxyMembershipID mockClientProxyMembershipId = mock(ClientProxyMembershipID.class);
+    CacheClientProxy mockCacheClientProxy = mock(CacheClientProxy.class);
+
+    doReturn(mockClientProxyMembershipId).when(mockCacheClientProxy).getProxyID();
+    ((HAContainerWrapper) regionQueue.haContainer).putProxy(haRegionName, mockCacheClientProxy);
+
+    EventID mockEventID = mock(EventID.class);
+    ClientUpdateMessageImpl mockClientUpdateMessage = mock(ClientUpdateMessageImpl.class);
+    mockClientUpdateMessage.setEventIdentifier(mockEventID);
+
+    doReturn(true).when(mockClientUpdateMessage)
+        .isClientInterestedInUpdates(mockClientProxyMembershipId);
+
+    HAEventWrapper originalHAEventWrapper = new HAEventWrapper(mockEventID);
+    originalHAEventWrapper.setClientUpdateMessage(mockClientUpdateMessage);
+
+    // allow putInProgress to be false (so we null out the msg field in the wrapper)
+    regionQueue.putEntryConditionallyIntoHAContainer(originalHAEventWrapper);
+
+    // the initial haContainer.putIfAbsent() doesn't need to invoke addClientInterestList
+    // as it is already part of the original message
+    verify(mockClientUpdateMessage, times(0)).addClientInterestList(mockClientProxyMembershipId,
+        true);
+
+    // create a new wrapper with the same id and message
+    HAEventWrapper newHAEventWrapper = new HAEventWrapper(mockEventID);
+    newHAEventWrapper.setClientUpdateMessage(mockClientUpdateMessage);
+
+    regionQueue.putEntryConditionallyIntoHAContainer(newHAEventWrapper);
+
+    // Verify that the original haContainerEntry gets the updated clientInterestList
+    verify(mockClientUpdateMessage, times(1)).addClientInterestList(mockClientProxyMembershipId,
+        true);
+  }
+
   /**
    * Wait until a given runnable stops throwing exceptions. It should take at least
    * minimumElapsedTime after the supplied start time to happen.
@@ -1447,7 +1856,15 @@ public class HARegionQueueJUnitTest {
   }
 
   /**
-   * Creates region-queue object
+   * Creates HA region-queue object with specified queue type
+   */
+  private HARegionQueue createHARegionQueue(String name, int queueType)
+      throws IOException, ClassNotFoundException, CacheException, InterruptedException {
+    return HARegionQueue.getHARegionQueueInstance(name, cache, queueType, false);
+  }
+
+  /**
+   * Creates region-queue object with specified HARegionQueueAttributes
    */
   HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs)
       throws IOException, ClassNotFoundException, CacheException, InterruptedException {
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierIntegrationTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierIntegrationTest.java
new file mode 100644
index 0000000..07073d5
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifierIntegrationTest.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.shiro.subject.Subject;
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.Statistics;
+import org.apache.geode.StatisticsType;
+import org.apache.geode.cache.AttributesMutator;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.RegionAttributes;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DSClock;
+import org.apache.geode.distributed.internal.DistributionConfig;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.SystemTimer;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.FilterProfile;
+import org.apache.geode.internal.cache.FilterRoutingInfo;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegionArguments;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.test.junit.categories.ClientSubscriptionTest;
+
+@Category({ClientSubscriptionTest.class})
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({CacheClientProxy.class, DataSerializer.class})
+public class CacheClientNotifierIntegrationTest {
+  @Test
+  public void testCacheClientNotifier_NotifyClients_QRMCausesPrematureRemovalFromHAContainer()
+      throws Exception {
+    final CountDownLatch messageDispatcherInitLatch = new CountDownLatch(1);
+    final CountDownLatch notifyClientLatch = new CountDownLatch(1);
+
+    setupMockMessageDispatcher(messageDispatcherInitLatch, notifyClientLatch);
+
+    InternalCache mockInternalCache = createMockInternalCache();
+
+    CacheClientNotifier cacheClientNotifier =
+        CacheClientNotifier.getInstance(mockInternalCache, mock(CacheServerStats.class),
+            100000, 100000, mock(ConnectionListener.class), null, false);
+
+    final String mockRegionNameProxyOne = "mockHARegionProxyOne";
+    final String mockRegionNameProxyTwo = "mockHARegionProxyTwo";
+
+    CacheClientProxy cacheClientProxyOne =
+        createMockCacheClientProxy(cacheClientNotifier, mockRegionNameProxyOne);
+    CacheClientProxy cacheClientProxyTwo =
+        createMockCacheClientProxy(cacheClientNotifier, mockRegionNameProxyTwo);
+
+    createMockHARegion(mockInternalCache, cacheClientProxyOne, mockRegionNameProxyOne, true);
+    createMockHARegion(mockInternalCache, cacheClientProxyTwo, mockRegionNameProxyTwo, false);
+
+    ExecutorService executorService = Executors.newFixedThreadPool(2);
+
+    Collection<Callable<Void>> initAndNotifyTasks = new ArrayList<>();
+
+    // On one thread, we are initializing the message dispatchers for the two CacheClientProxy
+    // objects. For the second client's initialization,
+    // we block until we can simulate a QRM message while the event sits in the CacheClientProxy's
+    // queuedEvents collection.
+    // This blocking logic can be found in setupMockMessageDispatcher().
+    ((ArrayList<Callable<Void>>) initAndNotifyTasks).add((Callable<Void>) () -> {
+      cacheClientProxyOne.initializeMessageDispatcher();
+      cacheClientProxyTwo.initializeMessageDispatcher();
+      return null;
+    });
+
+    EntryEventImpl mockEntryEventImpl = createMockEntryEvent(new ArrayList<CacheClientProxy>() {
+      {
+        add(cacheClientProxyOne);
+        add(cacheClientProxyTwo);
+      }
+    });
+
+    // On a second thread, we are notifying clients of our mocked events. For the first client, we
+    // wait for the message dispatcher to be
+    // initialized so the event is put/processed. For the second client, we ensure that the
+    // dispatcher is still initializing so we put the event
+    // in the CacheClientProxy's queuedEvents collection. When the event is handled by the first
+    // CacheClientProxy, we mocked a QRM in the HARegion
+    // put to simulate a QRM message being received at that time. The simulated QRM logic can be
+    // found in the createMockHARegion() method.
+    ((ArrayList<Callable<Void>>) initAndNotifyTasks).add((Callable<Void>) () -> {
+      messageDispatcherInitLatch.await();
+      CacheClientNotifier.notifyClients(mockEntryEventImpl);
+      notifyClientLatch.countDown();
+      return null;
+    });
+
+    List<Future<Void>> futures = executorService.invokeAll(initAndNotifyTasks);
+
+    for (Future<Void> future : futures) {
+      future.get();
+    }
+
+    // Verify that we do not hang in peek() for the second proxy due to the wrapper
+    Awaitility.waitAtMost(new Duration(30, TimeUnit.SECONDS)).until(() -> {
+      try {
+        Object eventPeeked = null;
+        while (eventPeeked == null) {
+          // Simulating message dispatching. We peek() and remove() but aren't testing
+          // the actual message delivery for this test.
+          eventPeeked = cacheClientProxyTwo.getHARegionQueue().peek();
+          if (eventPeeked != null) {
+            cacheClientProxyTwo.getHARegionQueue().remove();
+          }
+        }
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        throw (new RuntimeException(e));
+      }
+    });
+
+    Assert.assertEquals("Expected the HAContainer to be empty", 0,
+        cacheClientNotifier.getHaContainer().size());
+  }
+
+  private HARegion createMockHARegion(InternalCache internalCache,
+      CacheClientProxy cacheClientProxy,
+      String haRegionName,
+      boolean simulateQrm)
+      throws IOException, ClassNotFoundException {
+    HARegion mockHARegion = mock(HARegion.class);
+
+    doReturn(internalCache).when(mockHARegion).getCache();
+    doReturn(internalCache).when(mockHARegion).getGemFireCache();
+    doReturn(mock(CancelCriterion.class)).when(mockHARegion).getCancelCriterion();
+    doReturn(mock(AttributesMutator.class)).when(mockHARegion).getAttributesMutator();
+    doReturn(mockHARegion).when(internalCache).createVMRegion(eq(haRegionName),
+        any(RegionAttributes.class), any(InternalRegionArguments.class));
+
+    // We use a mock of the HARegion.put() method to simulate an queue removal message
+    // immediately after the event was successfully put. In production when a queue removal takes
+    // place at this time,
+    // it will decrement the ref count on the HAEventWrapper and potentially make it eligible for
+    // removal later on
+    // when CacheClientNotifier.checkAndRemoveFromClientMsgsRegion() is called.
+
+    Map<Object, Object> events = new HashMap<Object, Object>();
+
+    doAnswer(new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocation) throws Throwable {
+        long position = invocation.getArgument(0);
+        HAEventWrapper haEventWrapper = invocation.getArgument(1);
+
+        if (simulateQrm) {
+          // This call is ultimately what a QRM message will do when it is processed, so we simulate
+          // that here.
+          cacheClientProxy.getHARegionQueue().destroyFromAvailableIDs(position);
+          events.remove(position);
+          cacheClientProxy.getHARegionQueue()
+              .decAndRemoveFromHAContainer((HAEventWrapper) haEventWrapper);
+        } else {
+          events.put(position, haEventWrapper);
+        }
+
+        return null;
+      }
+    }).when(mockHARegion).put(any(long.class), any(HAEventWrapper.class));
+
+    // This is so that when peek() is called, the object is returned. Later we want to verify that
+    // it was successfully "delivered" to the client and subsequently removed from the HARegion.
+    doAnswer(new Answer<Object>() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        return events.get(invocation.getArgument(0));
+      }
+    }).when(mockHARegion).get(any(long.class));
+
+    return mockHARegion;
+  }
+
+  private InternalCache createMockInternalCache() {
+    InternalCache mockInternalCache = mock(InternalCache.class);
+    doReturn(mock(SystemTimer.class)).when(mockInternalCache).getCCPTimer();
+    doReturn(mock(CancelCriterion.class)).when(mockInternalCache).getCancelCriterion();
+
+    InternalDistributedSystem mockInteralDistributedSystem = createMockInternalDistributedSystem();
+    doReturn(mockInteralDistributedSystem).when(mockInternalCache).getInternalDistributedSystem();
+    doReturn(mockInteralDistributedSystem).when(mockInternalCache).getDistributedSystem();
+
+    return mockInternalCache;
+  }
+
+  private void setupMockMessageDispatcher(CountDownLatch messageDispatcherInitLatch,
+      CountDownLatch notifyClientLatch) throws Exception {
+    PowerMockito.whenNew(CacheClientProxy.MessageDispatcher.class).withAnyArguments()
+        .thenAnswer(new Answer<CacheClientProxy.MessageDispatcher>() {
+          private boolean secondClient = false;
+
+          @Override
+          public CacheClientProxy.MessageDispatcher answer(
+              org.mockito.invocation.InvocationOnMock invocation) throws Throwable {
+            if (secondClient) {
+              messageDispatcherInitLatch.countDown();
+              notifyClientLatch.await();
+            }
+
+            secondClient = true;
+
+            CacheClientProxy cacheClientProxy = invocation.getArgument(0);
+            String name = invocation.getArgument(1);
+
+            CacheClientProxy.MessageDispatcher messageDispatcher =
+                new CacheClientProxy.MessageDispatcher(cacheClientProxy,
+                    name);
+
+            return messageDispatcher;
+          }
+        });
+
+    PowerMockito.mockStatic(InternalDataSerializer.class);
+  }
+
+  private InternalDistributedSystem createMockInternalDistributedSystem() {
+    InternalDistributedSystem mockInternalDistributedSystem = mock(InternalDistributedSystem.class);
+    DistributionManager mockDistributionManager = mock(DistributionManager.class);
+
+    doReturn(mock(InternalDistributedMember.class)).when(mockInternalDistributedSystem)
+        .getDistributedMember();
+    doReturn(mock(Statistics.class)).when(mockInternalDistributedSystem)
+        .createAtomicStatistics(any(StatisticsType.class), any(String.class));
+    doReturn(mock(DistributionConfig.class)).when(mockDistributionManager).getConfig();
+    doReturn(mockDistributionManager).when(mockInternalDistributedSystem).getDistributionManager();
+    doReturn(mock(DSClock.class)).when(mockInternalDistributedSystem).getClock();
+
+    return mockInternalDistributedSystem;
+  }
+
+  private CacheClientProxy createMockCacheClientProxy(CacheClientNotifier cacheClientNotifier,
+      String haRegionName)
+      throws IOException {
+    Socket mockSocket = mock(Socket.class);
+    doReturn(mock(InetAddress.class)).when(mockSocket).getInetAddress();
+
+    ClientProxyMembershipID mockClientProxyMembershipID = mock(ClientProxyMembershipID.class);
+    doReturn(mock(DistributedMember.class)).when(mockClientProxyMembershipID)
+        .getDistributedMember();
+    doReturn(haRegionName).when(mockClientProxyMembershipID).getHARegionName();
+
+    CacheClientProxy cacheClientProxy =
+        new CacheClientProxy(cacheClientNotifier, mockSocket, mockClientProxyMembershipID, true,
+            (byte) 0, Version.GFE_58, 0, true, mock(SecurityService.class), mock(Subject.class));
+
+    cacheClientNotifier.addClientInitProxy(cacheClientProxy);
+
+    return cacheClientProxy;
+  }
+
+  private EntryEventImpl createMockEntryEvent(List<CacheClientProxy> proxies) {
+    FilterRoutingInfo.FilterInfo mockFilterInfo = mock(FilterRoutingInfo.FilterInfo.class);
+    Set mockInterestedClients = mock(Set.class);
+    doReturn(mockInterestedClients).when(mockFilterInfo).getInterestedClients();
+    doReturn(null).when(mockFilterInfo).getInterestedClientsInv();
+
+    EntryEventImpl mockEntryEventImpl = mock(EntryEventImpl.class);
+    doReturn(mockFilterInfo).when(mockEntryEventImpl).getLocalFilterInfo();
+
+    FilterProfile mockFilterProfile = mock(FilterProfile.class);
+    Set mockRealClientIDs = new HashSet<ClientProxyMembershipID>();
+
+    for (CacheClientProxy proxy : proxies) {
+      mockRealClientIDs.add(proxy.getProxyID());
+    }
+
+    doReturn(mockRealClientIDs).when(mockFilterProfile).getRealClientIDs(any(Collection.class));
+
+    LocalRegion mockLocalRegion = mock(LocalRegion.class);
+    doReturn(mockFilterProfile).when(mockLocalRegion).getFilterProfile();
+
+    doReturn(mockLocalRegion).when(mockEntryEventImpl).getRegion();
+    doReturn(EnumListenerEvent.AFTER_CREATE).when(mockEntryEventImpl).getEventType();
+    doReturn(Operation.CREATE).when(mockEntryEventImpl).getOperation();
+    EventID mockEventId = mock(EventID.class);
+    doReturn(new byte[] {1}).when(mockEventId).getMembershipID();
+    doReturn(mockEventId).when(mockEntryEventImpl).getEventId();
+
+    return mockEntryEventImpl;
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
index cafc43f..ad1baab 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
@@ -192,7 +192,7 @@ public class HARegion extends DistributedRegion {
 
       // <HA overflow>
       if (conflatable instanceof HAEventWrapper) {
-        this.owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) conflatable);
+        this.owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) conflatable, "Invalidate");
       }
       // </HA overflow>
       // update the stats
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HAContainerMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HAContainerMap.java
index 99b40ea..9a8ed7d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HAContainerMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HAContainerMap.java
@@ -53,8 +53,6 @@ public class HAContainerMap implements HAContainerWrapper {
   }
 
   public Object putProxy(String haName, CacheClientProxy proxy) {
-    // InternalDistributedSystem.getLoggerI18n().info(LocalizedStrings.DEBUG, "adding proxy for " +
-    // haName + ": " + proxy, new Exception("stack trace"));
     return haRegionNameToProxy.put(haName, proxy);
   }
 
@@ -63,8 +61,6 @@ public class HAContainerMap implements HAContainerWrapper {
   }
 
   public Object removeProxy(String haName) {
-    // InternalDistributedSystem.getLoggerI18n().info(LocalizedStrings.DEBUG, "removing proxy for "
-    // + haName, new Exception("stack trace"));
     return haRegionNameToProxy.remove(haName);
   }
 
@@ -91,12 +87,10 @@ public class HAContainerMap implements HAContainerWrapper {
   }
 
   public boolean containsValue(Object value) {
-    // return map.containsValue(value);
     throw new UnsupportedOperationException("containsValue() not supported.");
   }
 
   public Set entrySet() {
-    // return map.entrySet();
     throw new UnsupportedOperationException("entrySet() not supported.");
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 6928b8d..8c241b2 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -609,10 +609,20 @@ public class HARegionQueue implements RegionQueue {
           logger.debug("{}: adding message to GII queue of size {}: {}", this.regionName,
               giiQueue.size(), object);
         }
+        HAEventWrapper haContainerKey = null;
+
         if (object instanceof HAEventWrapper) {
-          // bug #43609 - prevent loss of the message while in the queue
-          putEntryConditionallyIntoHAContainer((HAEventWrapper) object);
+          HAEventWrapper wrapper = (HAEventWrapper) object;
+          wrapper.incrementPutInProgressCounter();
+          if (logger.isDebugEnabled()) {
+            logger
+                .debug("Incremented PutInProgressCounter during GII queueing. Event ID hash code: "
+                    + wrapper.hashCode() + "; System ID hash code: "
+                    + System.identityHashCode(wrapper)
+                    + "; Wrapper details: " + wrapper);
+          }
         }
+
         this.giiQueue.add(object);
       } else {
         if (logger.isTraceEnabled()) {
@@ -764,24 +774,21 @@ public class HARegionQueue implements RegionQueue {
             }
             if (value instanceof HAEventWrapper) {
               if (((HAEventWrapper) value).getClientUpdateMessage() == null) {
-                // if there is no wrapped message look for it in the HA container map
-                ClientUpdateMessageImpl haContainerMessage =
-                    (ClientUpdateMessageImpl) haContainer.get(value);
-                if (haContainerMessage != null) {
-                  ((HAEventWrapper) value).setClientUpdateMessage(haContainerMessage);
-                } else {
-                  if (isDebugEnabled) {
-                    logger.debug(
-                        "{} ATTENTION: found gii queued event with null event message.  Please see bug #44852: {}",
-                        this.regionName, value);
-                  }
-                  continue;
+                if (isDebugEnabled) {
+                  logger.debug(
+                      "{} ATTENTION: found gii queued event with null event message.  Please see bug #44852: {}",
+                      this.regionName, value);
                 }
+                continue;
               }
             }
+
             basicPut(value);
+
+            // The HAEventWrapper putInProgressCounter must be decremented because it was
+            // incremented when it was queued in giiQueue.
             if (value instanceof HAEventWrapper) {
-              decAndRemoveFromHAContainer((HAEventWrapper) value);
+              ((HAEventWrapper) value).decrementPutInProgressCounter();
             }
           } catch (NoSuchElementException ignore) {
             break;
@@ -1038,6 +1045,10 @@ public class HARegionQueue implements RegionQueue {
   void publish(Long position) throws InterruptedException {
     acquireWriteLock();
     try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Adding position " + position + " to available IDs. Region: " + regionName);
+      }
+
       this.idsAvailable.add(position);
       // Notify the waiting peek threads or take threads of blocking queue
       // A void operation for the non blocking queue operations
@@ -1054,11 +1065,16 @@ public class HARegionQueue implements RegionQueue {
   /**
    * @param position Long value present in the Available IDs map against which Event object is
    *        present in HARegion. This function is directly invoked from the basicInvalidate function
-   *        where expiry is aborted if this function returns false
+   *        where
+   *        expiry is aborted if this function returns false
    * @return boolean true if the position could be removed from the Set
    * @throws InterruptedException *
    */
   public boolean destroyFromAvailableIDs(Long position) throws InterruptedException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Removing position " + position + " from available IDs. Region: " + regionName);
+    }
+
     boolean removedOK = false;
     acquireWriteLock();
     try {
@@ -1086,6 +1102,11 @@ public class HARegionQueue implements RegionQueue {
    *         specified was removed from the Set
    */
   protected boolean destroyFromAvailableIDsAndRegion(Long position) throws InterruptedException {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Removing position " + position + " from available IDs and region. Region: "
+          + this.regionName);
+    }
+
     boolean removedOK = this.destroyFromAvailableIDs(position);
 
     if (removedOK) {
@@ -1359,10 +1380,12 @@ public class HARegionQueue implements RegionQueue {
 
   @Override
   public Object peek() throws InterruptedException {
-    if (Thread.interrupted())
+    if (Thread.interrupted()) {
       throw new InterruptedException();
+    }
     Conflatable object = null;
     Long next = null;
+
     while (true) {
       try {
         next = (Long) this.getNextAvailableIDFromList();
@@ -1372,13 +1395,18 @@ public class HARegionQueue implements RegionQueue {
       } catch (TimeoutException ignore) {
         throw new InterruptedException();
       }
+
       object = (Conflatable) this.region.get(next);
+
+      // It is possible for the object to be null if a queue removal
+      // occurred between getting the next available ID and getting the object
+      // from the region. If this happens, on the next iteration of this loop we will
+      // get a different available ID to process
       if (object != null) {
-        // peeked a object, so add the correponding counter to thread-context
         object = (object instanceof HAEventWrapper) ? (Conflatable) this.haContainer.get(object)
             : object;
 
-        if (object != null) { // Is it possible for object to be null...when?
+        if (object != null) {
           List peekedEvents;
           if ((peekedEvents = (List) HARegionQueue.peekedEventsContext.get()) != null) {
             peekedEvents.add(next);
@@ -2111,13 +2139,18 @@ public class HARegionQueue implements RegionQueue {
             continue;
           }
           synchronized (entryHaEventWrapper) {
-            if ((HAEventWrapper) haContainer.getKey(entryHaEventWrapper) != null) {
+            if (entryHaEventWrapper == (HAEventWrapper) haContainer.getKey(entryHaEventWrapper)) {
               entryHaEventWrapper.incAndGetReferenceCount();
               addClientCQsAndInterestList(entryMessage, inputHaEventWrapper, haContainer,
                   regionName);
               inputHaEventWrapper.setClientUpdateMessage(null);
               newValueCd =
                   new VMCachedDeserializable(entryHaEventWrapper, newValueCd.getSizeInBytes());
+              if (logger.isDebugEnabled()) {
+                logger.debug("GII Update of Event ID hash code: " + entryHaEventWrapper.hashCode()
+                    + "; System ID hash code: " + System.identityHashCode(entryHaEventWrapper)
+                    + "; Wrapper details: " + entryHaEventWrapper);
+              }
             } else {
               entryHaEventWrapper = null;
             }
@@ -2127,7 +2160,11 @@ public class HARegionQueue implements RegionQueue {
             inputHaEventWrapper.incAndGetReferenceCount();
             inputHaEventWrapper.setHAContainer(haContainer);
             inputHaEventWrapper.setClientUpdateMessage(null);
-            inputHaEventWrapper.setIsRefFromHAContainer(true);
+            if (logger.isDebugEnabled()) {
+              logger.debug("GII Add of Event ID hash code: " + inputHaEventWrapper.hashCode()
+                  + "; System ID hash code: " + System.identityHashCode(inputHaEventWrapper)
+                  + "; Wrapper details: " + entryHaEventWrapper);
+            }
           }
           break;
         }
@@ -2972,6 +3009,8 @@ public class HARegionQueue implements RegionQueue {
       }
 
       boolean rejected = false;
+      Conflatable eventInHARegion = null;
+
       synchronized (this) {
         if (sequenceID > this.lastSequenceIDPut) {
           if (logger.isTraceEnabled()) {
@@ -2999,11 +3038,12 @@ public class HARegionQueue implements RegionQueue {
         if (lastDispatchedSequenceId == TOKEN_DESTROYED) {
           return false;
         }
+
         if (sequenceID > lastDispatchedSequenceId || owningQueue.puttingGIIDataInQueue) {
           // Insert the object into the Region
           Long position = owningQueue.tailKey.incrementAndGet();
 
-          owningQueue.putEventInHARegion(event, position);
+          eventInHARegion = owningQueue.putEventInHARegion(event, position);
 
           // Add the position counter to the LinkedHashSet
           if (this.counters == null) {
@@ -3012,12 +3052,12 @@ public class HARegionQueue implements RegionQueue {
           this.counters.put(position, null);
 
           // Check if the event is conflatable
-          if (owningQueue.shouldBeConflated(event)) {
+          if (owningQueue.shouldBeConflated(eventInHARegion)) {
             // Add to the conflation map & get the position of the
             // old conflatable entry. The old entry may have inserted by the
             // same
             // ThreadIdentifier or different one.
-            oldPosition = owningQueue.addToConflationMap(event, position);
+            oldPosition = owningQueue.addToConflationMap(eventInHARegion, position);
           }
 
           // Take the size lock & add to the list of availabelIds
@@ -3037,7 +3077,7 @@ public class HARegionQueue implements RegionQueue {
           ccn.getClientProxy(owningQueue.clientProxyID).getStatistics().incMessagesFailedQueued();
         }
       } else {
-        owningQueue.entryEnqueued(event);
+        owningQueue.entryEnqueued(eventInHARegion);
       }
       // Remove the old conflated position
       if (oldPosition != null) {
@@ -3073,7 +3113,8 @@ public class HARegionQueue implements RegionQueue {
           }
           // <HA overflow>
           if (conflatable instanceof HAEventWrapper) {
-            owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) conflatable);
+            owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) conflatable,
+                "Remove Old Conflated Entry");
           }
           // </HA overflow>
           // update statistics
@@ -3227,7 +3268,9 @@ public class HARegionQueue implements RegionQueue {
               if (((HAEventWrapper) event).getReferenceCount() == 0 && logger.isDebugEnabled()) {
                 logger.debug("Reference count is already zero for event {}", event.getEventId());
               }
-              owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) event);
+
+              owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) event,
+                  "Queue Removal Message");
             }
 
             // At this point we know we're going to remove the event,
@@ -3287,7 +3330,7 @@ public class HARegionQueue implements RegionQueue {
           }
           // <HA overflow>
           if (wrapper instanceof HAEventWrapper) {
-            owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) wrapper);
+            owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) wrapper, "Message Dispatcher");
           }
           // </HA overflow>
           owningQueue.stats.incEventsRemoved();
@@ -3428,61 +3471,24 @@ public class HARegionQueue implements RegionQueue {
    *
    * @since GemFire 5.7
    */
-  protected void putEventInHARegion(Conflatable event, Long position) {
+  protected Conflatable putEventInHARegion(Conflatable event, Long position) {
     if (event instanceof HAEventWrapper) {
       HAEventWrapper inputHaEventWrapper = (HAEventWrapper) event;
+      HAEventWrapper haContainerKey = null;
+
       if (this.isQueueInitialized()) {
-        if (inputHaEventWrapper.getIsRefFromHAContainer()) {
-          putEntryConditionallyIntoHAContainer(inputHaEventWrapper);
-        } else {
-          // This means that the haEventWrapper reference we have is not
-          // authentic, i.e. it doesn't refer to the HAEventWrapper instance
-          // in the haContainer, but to the one outside it.
-          HAEventWrapper haContainerKey = null;
-          do {
-            ClientUpdateMessageImpl haContainerEntry =
-                (ClientUpdateMessageImpl) ((HAContainerWrapper) this.haContainer)
-                    .putIfAbsent(inputHaEventWrapper, inputHaEventWrapper.getClientUpdateMessage());
-            if (haContainerEntry != null) {
-              haContainerKey = (HAEventWrapper) ((HAContainerWrapper) this.haContainer)
-                  .getKey(inputHaEventWrapper);
-              if (haContainerKey == null) {
-                continue;
-              }
-              synchronized (haContainerKey) {
-                // assert the entry is still present
-                if (((HAContainerWrapper) this.haContainer).getKey(haContainerKey) != null) {
-                  haContainerKey.incAndGetReferenceCount();
-                  addClientCQsAndInterestList(haContainerEntry, inputHaEventWrapper,
-                      this.haContainer, this.regionName);
-                  inputHaEventWrapper = haContainerKey;
-                } else {
-                  haContainerKey = null;
-                }
-              }
-            } else {
-              synchronized (inputHaEventWrapper) {
-                inputHaEventWrapper.incAndGetReferenceCount();
-                inputHaEventWrapper.setHAContainer(this.haContainer);
-                if (!inputHaEventWrapper.getPutInProgress()) {
-                  // This means that this is a GII'ed event. Hence we must
-                  // explicitly set 'clientUpdateMessage' to null.
-                  inputHaEventWrapper.setClientUpdateMessage(null);
-                }
-                inputHaEventWrapper.setIsRefFromHAContainer(true);
-              }
-              break;
-            }
-          } while (haContainerKey == null);
-        }
+        haContainerKey = putEntryConditionallyIntoHAContainer(inputHaEventWrapper);
+      } else {
+        haContainerKey = inputHaEventWrapper;
       }
-      // Put the reference to the HAEventWrapper instance into the
-      // HA queue.
+
       if (logger.isDebugEnabled()) {
-        logger.debug("adding inputHaEventWrapper to HARegion at " + position + ":"
-            + inputHaEventWrapper + " for " + this.regionName);
+        logger.debug("adding haContainerKey to HARegion at " + position + ":"
+            + haContainerKey + " for " + this.regionName);
       }
-      this.region.put(position, inputHaEventWrapper);
+      this.region.put(position, haContainerKey);
+
+      return haContainerKey;
     } else { // (event instanceof ClientMarkerMessageImpl OR ConflatableObject OR
              // ClientInstantiatorMessage)
       if (logger.isDebugEnabled()) {
@@ -3490,6 +3496,8 @@ public class HARegionQueue implements RegionQueue {
             + this.regionName);
       }
       this.region.put(position, event);
+
+      return event;
     }
   }
 
@@ -3503,18 +3511,17 @@ public class HARegionQueue implements RegionQueue {
         msg.addClientCqs(proxyID, clientCQ);
       }
     }
-    // if (haEventWrapper.getPutInProgress()) {
-    // ((HAEventWrapper)entry.getKey()).setPutInProgress(true);
-    // }
 
     // This is a remote HAEventWrapper.
     // Add new Interested client lists.
     ClientUpdateMessageImpl clientMsg =
         (ClientUpdateMessageImpl) haEventWrapper.getClientUpdateMessage();
-    if (clientMsg.isClientInterestedInUpdates(proxyID)) {
-      msg.addClientInterestList(proxyID, true);
-    } else if (clientMsg.isClientInterestedInInvalidates(proxyID)) {
-      msg.addClientInterestList(proxyID, false);
+    if (clientMsg != null) {
+      if (clientMsg.isClientInterestedInUpdates(proxyID)) {
+        msg.addClientInterestList(proxyID, true);
+      } else if (clientMsg.isClientInterestedInInvalidates(proxyID)) {
+        msg.addClientInterestList(proxyID, false);
+      }
     }
   }
 
@@ -3522,17 +3529,69 @@ public class HARegionQueue implements RegionQueue {
    * If the wrapper's referenceCount becomes 1 after increment, then set this haEventWrapper and its
    * clientUpdateMessage into the haContainer as <key, value>.
    *
-   * @param haEventWrapper An instance of {@code HAEventWrapper}
+   * @param inputHaEventWrapper An instance of {@code HAEventWrapper}
    * @since GemFire 5.7
    */
-  protected void putEntryConditionallyIntoHAContainer(HAEventWrapper haEventWrapper) {
-    if (haEventWrapper.incAndGetReferenceCount() == 1) {
-      // if (logger.isDebugEnabled()) {
-      // logger.fine("Putting event in haContainer: " + haEventWrapper);
-      // }
-      haEventWrapper.setHAContainer(HARegionQueue.this.haContainer);
-      this.haContainer.put(haEventWrapper, haEventWrapper.getClientUpdateMessage());
+  protected HAEventWrapper putEntryConditionallyIntoHAContainer(
+      HAEventWrapper inputHaEventWrapper) {
+    HAEventWrapper haContainerKey = null;
+
+    while (haContainerKey == null) {
+      ClientUpdateMessageImpl haContainerEntry =
+          (ClientUpdateMessageImpl) ((HAContainerWrapper) this.haContainer)
+              .putIfAbsent(inputHaEventWrapper, inputHaEventWrapper.getClientUpdateMessage());
+
+      if (haContainerEntry != null) {
+        haContainerKey = (HAEventWrapper) ((HAContainerWrapper) this.haContainer)
+            .getKey(inputHaEventWrapper);
+
+        // Key was already removed from the container, so continue
+        if (haContainerKey == null) {
+          continue;
+        }
+
+        synchronized (haContainerKey) {
+          // assert the entry is still present and we still have the same reference
+          if (haContainerKey == ((HAContainerWrapper) this.haContainer).getKey(haContainerKey)) {
+            haContainerKey.incAndGetReferenceCount();
+
+            addClientCQsAndInterestList(haContainerEntry, inputHaEventWrapper,
+                this.haContainer, this.regionName);
+
+            if (logger.isDebugEnabled()) {
+              logger.debug("Putting updated event in haContainer with Event ID hash code: "
+                  + haContainerKey.hashCode() + "; System ID hash code: "
+                  + System.identityHashCode(haContainerKey)
+                  + "; Wrapper details: " + haContainerKey);
+            }
+          } else {
+            haContainerKey = null;
+          }
+        }
+      } else {
+        synchronized (inputHaEventWrapper) {
+          inputHaEventWrapper.incAndGetReferenceCount();
+          inputHaEventWrapper.setHAContainer(this.haContainer);
+
+          if (!inputHaEventWrapper.getPutInProgress()) {
+            // This means that this is a GII'ed event. Hence we must
+            // explicitly set 'clientUpdateMessage' to null.
+            inputHaEventWrapper.setClientUpdateMessage(null);
+          }
+
+          if (logger.isDebugEnabled()) {
+            logger.debug("Putting new event in haContainer with Event ID hash code: "
+                + inputHaEventWrapper.hashCode()
+                + "; System ID hash code: " + System.identityHashCode(inputHaEventWrapper)
+                + "; Wrapper details: " + inputHaEventWrapper);
+          }
+        }
+
+        haContainerKey = inputHaEventWrapper;
+      }
     }
+
+    return haContainerKey;
   }
 
   /**
@@ -3601,6 +3660,7 @@ public class HARegionQueue implements RegionQueue {
         for (int i = 0; i < wrapperArray.length; i++) {
           wrapperSet.add(this.region.get(wrapperArray[i]));
         }
+
         // Start a new thread which will update the clientMessagesRegion for
         // each of the HAEventWrapper instances present in the wrapperSet
         Thread regionCleanupTask = new Thread(new Runnable() {
@@ -3611,7 +3671,8 @@ public class HARegionQueue implements RegionQueue {
               while (iter.hasNext()) {
                 Conflatable conflatable = (Conflatable) iter.next();
                 if (conflatable instanceof HAEventWrapper) {
-                  HARegionQueue.this.decAndRemoveFromHAContainer((HAEventWrapper) conflatable);
+                  HARegionQueue.this
+                      .decAndRemoveFromHAContainer((HAEventWrapper) conflatable, "Destroy");
                 }
               }
             } catch (CancelException ignore) {
@@ -3624,7 +3685,7 @@ public class HARegionQueue implements RegionQueue {
               }
             }
           }
-        });
+        }, "HA Region Cleanup for " + regionName);
         regionCleanupTask.start();
       }
     } catch (CancelException e) {
@@ -3662,7 +3723,7 @@ public class HARegionQueue implements RegionQueue {
       HAEventWrapper wrapper = (HAEventWrapper) conflatable;
       msg = (Conflatable) HARegionQueue.this.haContainer.get(wrapper);
       if (msg != null) {
-        decAndRemoveFromHAContainer(wrapper);
+        decAndRemoveFromHAContainer(wrapper, "GetAndRemoveFromHAContainer");
       }
     } else {
       msg = conflatable;
@@ -3671,23 +3732,44 @@ public class HARegionQueue implements RegionQueue {
   }
 
   /**
-   * IMPORTANT: <br>
-   * The wrapper passed here must be the authentic wrapper, i.e. it must be the one referred by the
-   * HARegion underlying this queue. <br>
-   * Decrements wrapper's reference count by one. If the decremented ref count is zero and put is
-   * not in progress, removes the entry from the haContainer.
+   * Decrements reference count for the wrapper in the container by one. If the decremented ref
+   * count is zero and put is not in progress, removes the entry from the haContainer.
    *
    * @since GemFire 5.7
    */
   public void decAndRemoveFromHAContainer(HAEventWrapper wrapper) {
-    if (wrapper.decAndGetReferenceCount() == 0L && !wrapper.getPutInProgress()) {
-      synchronized (wrapper) {
-        if (wrapper.getReferenceCount() == 0L) {
+    decAndRemoveFromHAContainer(wrapper, "");
+  }
+
+  public void decAndRemoveFromHAContainer(HAEventWrapper wrapper, String caller) {
+    boolean decAndRemovePerformed = false;
+
+    while (!decAndRemovePerformed) {
+      HAEventWrapper haContainerKey =
+          (HAEventWrapper) ((HAContainerWrapper) haContainer).getKey(wrapper);
+
+      if (haContainerKey == null) {
+        break;
+      }
+
+      synchronized (haContainerKey) {
+        if (haContainerKey == (HAEventWrapper) ((HAContainerWrapper) haContainer).getKey(wrapper)) {
           if (logger.isDebugEnabled()) {
-            logger.debug("Removing event from {}: {}", this.region.getFullPath(),
-                wrapper.getEventId());
+            logger.debug(caller + " decremented Event ID hash code: " + haContainerKey.hashCode()
+                + "; System ID hash code: " + System.identityHashCode(haContainerKey)
+                + "; Wrapper details: " + haContainerKey);
+          }
+          if (haContainerKey.decAndGetReferenceCount() == 0L) {
+            HARegionQueue.this.haContainer.remove(haContainerKey);
+            if (logger.isDebugEnabled()) {
+              logger.debug(
+                  caller + " removed Event ID hash code: " + haContainerKey.hashCode()
+                      + "; System ID hash code: "
+                      + System.identityHashCode(haContainerKey)
+                      + "; Wrapper details: " + haContainerKey);
+            }
           }
-          HARegionQueue.this.haContainer.remove(wrapper);
+          decAndRemovePerformed = true;
         }
       }
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
index 43fa076..743cac8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java
@@ -892,9 +892,15 @@ public class CacheClientNotifier {
       }
     } else {
       HAEventWrapper wrapper = new HAEventWrapper(clientMessage);
-      // Set the putInProgress flag to true before starting the put on proxy's
-      // HA queues. Nowhere else, this flag is being set to true.
-      wrapper.setPutInProgress(true);
+      wrapper.incrementPutInProgressCounter();
+
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Initial increment PutInProgressCounter on HAEventWrapper with Event ID hash code: "
+                + wrapper.hashCode() + "; System ID hash code: "
+                + System.identityHashCode(wrapper) + "; Wrapper details: " + wrapper);
+      }
+
       conflatable = wrapper;
     }
 
@@ -990,7 +996,11 @@ public class CacheClientNotifier {
         this.blackListSlowReceiver(proxy);
       }
     }
-    checkAndRemoveFromClientMsgsRegion(conflatable);
+
+    if (conflatable instanceof HAEventWrapper) {
+      ((HAEventWrapper) conflatable).decrementPutInProgressCounter();
+    }
+
     // Remove any dead clients from the clients to notify
     if (deadProxies != null) {
       closeDeadProxies(deadProxies, false);
@@ -1302,47 +1312,6 @@ public class CacheClientNotifier {
   }
 
   /**
-   * If the conflatable is an instance of HAEventWrapper, and if the corresponding entry is present
-   * in the haContainer, set the reference to the clientUpdateMessage to null and putInProgress flag
-   * to false. Also, if the ref count is zero, then remove the entry from the haContainer.
-   *
-   * @since GemFire 5.7
-   */
-  private void checkAndRemoveFromClientMsgsRegion(Conflatable conflatable) {
-    if (haContainer == null) {
-      return;
-    }
-    if (conflatable instanceof HAEventWrapper) {
-      HAEventWrapper wrapper = (HAEventWrapper) conflatable;
-      if (!wrapper.getIsRefFromHAContainer()) {
-        wrapper = (HAEventWrapper) haContainer.getKey(wrapper);
-        if (wrapper != null && !wrapper.getPutInProgress()) {
-          synchronized (wrapper) {
-            if (wrapper.getReferenceCount() == 0L) {
-              if (logger.isDebugEnabled()) {
-                logger.debug("Removing event from haContainer: {}", wrapper);
-              }
-              haContainer.remove(wrapper);
-            }
-          }
-        }
-      } else {
-        // This wrapper resides in haContainer.
-        wrapper.setClientUpdateMessage(null);
-        wrapper.setPutInProgress(false);
-        synchronized (wrapper) {
-          if (wrapper.getReferenceCount() == 0L) {
-            if (logger.isDebugEnabled()) {
-              logger.debug("Removing event from haContainer: {}", wrapper);
-            }
-            haContainer.remove(wrapper);
-          }
-        }
-      }
-    }
-  }
-
-  /**
    * Returns the <code>CacheClientProxy</code> associated to the membershipID *
    *
    * @return the <code>CacheClientProxy</code> associated to the membershipID
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
index 15317da..84a67e3 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientProxy.java
@@ -1544,6 +1544,7 @@ public class CacheClientProxy implements ClientSession {
   protected void deliverMessage(Conflatable conflatable) {
     ThreadState state = this.securityService.bindSubject(this.subject);
     ClientUpdateMessage clientMessage = null;
+
     if (conflatable instanceof HAEventWrapper) {
       clientMessage = ((HAEventWrapper) conflatable).getClientUpdateMessage();
     } else {
@@ -1571,7 +1572,22 @@ public class CacheClientProxy implements ClientSession {
                   "Message dispatcher for proxy {} is getting initialized. Adding message to the queuedEvents.",
                   this);
             }
+
+            if (conflatable instanceof HAEventWrapper) {
+              HAEventWrapper haEventWrapper = (HAEventWrapper) conflatable;
+              haEventWrapper.incrementPutInProgressCounter();
+              if (logger.isDebugEnabled()) {
+                logger.debug(
+                    "Incremented PutInProgressCounter on HAEventWrapper with Event ID hash code: "
+                        + haEventWrapper.hashCode()
+                        + "; System ID hash code: "
+                        + System.identityHashCode(haEventWrapper) + "; Wrapper details: "
+                        + haEventWrapper);
+              }
+            }
+
             this.queuedEvents.add(conflatable);
+
             return;
           }
         }
@@ -1581,13 +1597,24 @@ public class CacheClientProxy implements ClientSession {
         this._messageDispatcher.enqueueMessage(conflatable);
       } else {
         this._statistics.incMessagesFailedQueued();
+
         if (logger.isDebugEnabled()) {
           logger.debug(
-              "Message is not added to the queue. Message dispatcher for proxy: {} doesn't exist.",
-              this);
+              "Message was not added to the queue. Message dispatcher was null for proxy: " + this
+                  + ". Event ID hash code: " + conflatable.hashCode() + "; System ID hash code: "
+                  + System.identityHashCode(conflatable) + "; Conflatable details: " + conflatable
+                      .toString());
         }
       }
     } else {
+      if (logger.isDebugEnabled()) {
+        logger.debug(
+            "Message was not added to the queue. Event ID hash code: " + conflatable.hashCode()
+                + "; System ID hash code: "
+                + System.identityHashCode(conflatable) + "; Conflatable details: " + conflatable
+                    .toString());
+      }
+
       this._statistics.incMessagesFailedQueued();
     }
 
@@ -1690,17 +1717,13 @@ public class CacheClientProxy implements ClientSession {
         logger.debug("{} draining {} events from init queue into intialized queue", this,
             this.queuedEvents.size());
       }
-      Conflatable nextEvent;
-      while ((nextEvent = queuedEvents.poll()) != null) {
-        this._messageDispatcher.enqueueMessage(nextEvent);
-      }
+
+      drainQueuedEvents(false);
 
       // Now finish emptying the queue with synchronization to make
       // sure we don't miss any events.
       synchronized (this.queuedEventsSync) {
-        while ((nextEvent = queuedEvents.poll()) != null) {
-          this._messageDispatcher.enqueueMessage(nextEvent);
-        }
+        drainQueuedEvents(true);
 
         this.messageDispatcherInit = false; // Done initialization.
       }
@@ -1711,6 +1734,29 @@ public class CacheClientProxy implements ClientSession {
     }
   }
 
+  private void drainQueuedEvents(boolean withSynchronization) {
+    Conflatable nextEvent;
+    while ((nextEvent = queuedEvents.poll()) != null) {
+      if (logger.isDebugEnabled()) {
+        if (nextEvent instanceof HAEventWrapper) {
+          logger.debug(
+              "Draining events queued during message dispatcher initialization "
+                  + (withSynchronization ? "with" : "without")
+                  + " synchronization. Event ID hash code: "
+                  + nextEvent.hashCode()
+                  + "; System ID hash code: " + System.identityHashCode(nextEvent)
+                  + "; Wrapper details: " + nextEvent);
+        }
+      }
+
+      this._messageDispatcher.enqueueMessage(nextEvent);
+
+      if (nextEvent instanceof HAEventWrapper) {
+        ((HAEventWrapper) nextEvent).decrementPutInProgressCounter();
+      }
+    }
+  }
+
   protected void startOrResumeMessageDispatcher(boolean processedMarker) {
     // Only start or resume the dispatcher if it is Primary
     if (this.isPrimary) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HAEventWrapper.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HAEventWrapper.java
index 9a31992..a755efa 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HAEventWrapper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/HAEventWrapper.java
@@ -87,16 +87,13 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID, Siz
       AtomicLongFieldUpdater.newUpdater(HAEventWrapper.class, "referenceCount");
 
   /**
-   * If true, the entry containing this HAEventWrapper instance will not be removed from the
-   * haContainer, even if the referenceCount value is zero.
+   * If greater than zero, the entry containing this HAEventWrapper instance will not be removed
+   * from the haContainer, even if the referenceCount value is zero.
    */
-  private transient boolean putInProgress = false;
-
-  /**
-   * A value true indicates that this instance is not used in the <code>haContainer</code>. So any
-   * changes in this instance will not be visible to the <code>haContainer</code>.
-   */
-  private transient boolean isRefFromHAContainer = false;
+  @SuppressWarnings("unused")
+  private transient volatile long putInProgressCount;
+  private static final AtomicLongFieldUpdater<HAEventWrapper> putInProgressCountUpdater =
+      AtomicLongFieldUpdater.newUpdater(HAEventWrapper.class, "putInProgressCount");
 
   /**
    * A reference to its <code>ClientUpdateMessage</code> instance.
@@ -119,6 +116,7 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID, Siz
     this.shouldConflate = event.shouldBeConflated();
     this.eventIdentifier = event.getEventId();
     rcUpdater.set(this, 0);
+    putInProgressCountUpdater.set(this, 0);
     this.clientUpdateMessage = event;
     this.clientCqs = ((ClientUpdateMessageImpl) event).getClientCqs();
   }
@@ -128,6 +126,7 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID, Siz
     this.clientUpdateMessage = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_CREATE,
         new ClientProxyMembershipID(), eventId);
     rcUpdater.set(this, 0);
+    putInProgressCountUpdater.set(this, 0);
   }
 
   /**
@@ -208,22 +207,6 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID, Siz
     return this.clientCqs;
   }
 
-  public void setPutInProgress(boolean inProgress) {
-    this.putInProgress = inProgress;
-  }
-
-  public boolean getPutInProgress() {
-    return this.putInProgress;
-  }
-
-  public void setIsRefFromHAContainer(boolean bool) {
-    this.isRefFromHAContainer = bool;
-  }
-
-  public boolean getIsRefFromHAContainer() {
-    return this.isRefFromHAContainer;
-  }
-
   public void setHAContainer(Map container) {
     this.haContainer = container;
   }
@@ -258,12 +241,13 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID, Siz
   @Override
   public String toString() {
     if (this.clientUpdateMessage != null) {
-      return "HAEventWrapper[refCount=" + getReferenceCount() + "; msg=" + this.clientUpdateMessage
-          + "]";
+      return "HAEventWrapper[refCount=" + getReferenceCount() + "; putInProgress="
+          + putInProgressCountUpdater.get(this) + "; msg=" + this.clientUpdateMessage + "]";
     } else {
       return "HAEventWrapper[region=" + this.regionName + "; key=" + this.keyOfInterest
-          + "; refCount=" + getReferenceCount() + "; inContainer=" + this.isRefFromHAContainer
-          + "; putInProgress=" + this.putInProgress + "; event=" + this.eventIdentifier
+          + "; refCount=" + getReferenceCount()
+          + "; putInProgress=" + putInProgressCountUpdater.get(this) + "; event="
+          + this.eventIdentifier
           + ((this.clientUpdateMessage == null) ? "; no message" : ";with message")
           + ((this.clientUpdateMessage == null) ? ""
               : ("; op=" + this.clientUpdateMessage.getOperation()))
@@ -411,4 +395,35 @@ public class HAEventWrapper implements Conflatable, DataSerializableFixedID, Siz
     return rcUpdater.decrementAndGet(this);
   }
 
+  public long incrementPutInProgressCounter() {
+    return putInProgressCountUpdater.incrementAndGet(this);
+  }
+
+  public long decrementPutInProgressCounter() {
+    synchronized (this) {
+      long putInProgressCounter = putInProgressCountUpdater.decrementAndGet(this);
+
+      if (logger.isDebugEnabled()) {
+        logger.debug("Decremented PutInProgressCounter on HAEventWrapper with Event ID hash code: "
+            + hashCode() + "; System ID hash code: "
+            + System.identityHashCode(this) + "; Wrapper details: " + toString());
+      }
+
+      if (putInProgressCounter == 0L) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Setting HAEventWrapper ClientUpdateMessage to null.  Event ID hash code: "
+              + hashCode()
+              + "; System ID hash code: " + System.identityHashCode(this) + "; Wrapper details: "
+              + toString());
+        }
+        setClientUpdateMessage(null);
+      }
+
+      return putInProgressCounter;
+    }
+  }
+
+  public boolean getPutInProgress() {
+    return putInProgressCountUpdater.get(this) > 0;
+  }
 }