You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2018/04/24 17:26:01 UTC

[geode] branch develop updated: GEODE-4996: Addressed NPE by always using the region entry key

This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new f6dd58f  GEODE-4996: Addressed NPE by always using the region entry key
f6dd58f is described below

commit f6dd58f83528b32bb0fb1a44da9bb47ea29f5df3
Author: Barry Oglesby <bo...@users.noreply.github.com>
AuthorDate: Tue Apr 24 10:25:58 2018 -0700

    GEODE-4996: Addressed NPE by always using the region entry key
---
 .../geode/internal/cache/AbstractRegionMap.java    |  77 +-----
 .../org/apache/geode/internal/cache/HARegion.java  |   4 +
 .../geode/internal/cache/ha/HARegionQueue.java     |  83 +++++-
 .../geode/internal/cache/ha/HARegionDUnitTest.java |  13 +-
 .../internal/cache/ha/HARegionQueueDUnitTest.java  |  13 +-
 .../cache/ha/HARegionQueueIntegrationTest.java     | 286 +++++++++++++++++++++
 .../cache/tier/sockets/HABug36738DUnitTest.java    |  13 +-
 7 files changed, 411 insertions(+), 78 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 4574c4d..5b1f309 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -14,7 +14,6 @@
  */
 package org.apache.geode.internal.cache;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -54,17 +53,12 @@ import org.apache.geode.internal.cache.entries.DiskEntry;
 import org.apache.geode.internal.cache.entries.OffHeapRegionEntry;
 import org.apache.geode.internal.cache.eviction.EvictableEntry;
 import org.apache.geode.internal.cache.eviction.EvictionController;
-import org.apache.geode.internal.cache.ha.HAContainerWrapper;
-import org.apache.geode.internal.cache.ha.HARegionQueue;
 import org.apache.geode.internal.cache.map.CacheModificationLock;
 import org.apache.geode.internal.cache.map.FocusedRegionMap;
 import org.apache.geode.internal.cache.map.RegionMapDestroy;
 import org.apache.geode.internal.cache.persistence.DiskRegionView;
 import org.apache.geode.internal.cache.region.entry.RegionEntryFactoryBuilder;
-import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
-import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
-import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
 import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
 import org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.cache.versions.VersionHolder;
@@ -86,7 +80,6 @@ import org.apache.geode.internal.offheap.annotations.Retained;
 import org.apache.geode.internal.offheap.annotations.Unretained;
 import org.apache.geode.internal.sequencelog.EntryLogger;
 import org.apache.geode.internal.size.ReflectionSingleObjectSizer;
-import org.apache.geode.internal.util.BlobHelper;
 import org.apache.geode.internal.util.concurrent.ConcurrentMapWithReusableEntries;
 import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
 
@@ -830,73 +823,9 @@ public abstract class AbstractRegionMap
     }
 
     if (owner instanceof HARegion && newValue instanceof CachedDeserializable) {
-      Object actualVal = null;
-      CachedDeserializable newValueCd = (CachedDeserializable) newValue;
-      try {
-        actualVal = BlobHelper.deserializeBlob(newValueCd.getSerializedValue(),
-            sender.getVersionObject(), null);
-        newValue = new VMCachedDeserializable(actualVal, newValueCd.getSizeInBytes());
-      } catch (IOException | ClassNotFoundException e) {
-        throw new RuntimeException("Unable to deserialize HA event for region " + owner);
-      }
-      if (actualVal instanceof HAEventWrapper) {
-        HAEventWrapper haEventWrapper = (HAEventWrapper) actualVal;
-        // Key was removed at sender side so not putting it into the HARegion
-        if (haEventWrapper.getClientUpdateMessage() == null) {
-          return false;
-        }
-        // Getting the instance from singleton CCN..This assumes only one bridge
-        // server in the VM
-        HAContainerWrapper haContainer =
-            (HAContainerWrapper) CacheClientNotifier.getInstance().getHaContainer();
-        if (haContainer == null) {
-          return false;
-        }
-        HAEventWrapper original = null;
-        // synchronized (haContainer) {
-        do {
-          ClientUpdateMessageImpl oldMsg = (ClientUpdateMessageImpl) haContainer
-              .putIfAbsent(haEventWrapper, haEventWrapper.getClientUpdateMessage());
-          if (oldMsg != null) {
-            original = (HAEventWrapper) haContainer.getKey(haEventWrapper);
-            if (original == null) {
-              continue;
-            }
-            synchronized (original) {
-              if ((HAEventWrapper) haContainer.getKey(original) != null) {
-                original.incAndGetReferenceCount();
-                HARegionQueue.addClientCQsAndInterestList(oldMsg, haEventWrapper, haContainer,
-                    owner.getName());
-                haEventWrapper.setClientUpdateMessage(null);
-                newValue = new VMCachedDeserializable(original, newValueCd.getSizeInBytes());
-              } else {
-                original = null;
-              }
-            }
-          } else { // putIfAbsent successful
-            synchronized (haEventWrapper) {
-              haEventWrapper.incAndGetReferenceCount();
-              haEventWrapper.setHAContainer(haContainer);
-              haEventWrapper.setClientUpdateMessage(null);
-              haEventWrapper.setIsRefFromHAContainer(true);
-            }
-            break;
-          }
-          // try until we either get a reference to HAEventWrapper from
-          // HAContainer or successfully put one into it.
-        } while (original == null);
-        /*
-         * entry = (Map.Entry)haContainer.getEntry(haEventWrapper); if (entry != null) { original =
-         * (HAEventWrapper)entry.getKey(); original.incAndGetReferenceCount(); } else {
-         * haEventWrapper.incAndGetReferenceCount(); haEventWrapper.setHAContainer(haContainer);
-         * haContainer.put(haEventWrapper, haEventWrapper .getClientUpdateMessage());
-         * haEventWrapper.setClientUpdateMessage(null);
-         * haEventWrapper.setIsRefFromHAContainer(true); } } if (entry != null) {
-         * HARegionQueue.addClientCQsAndInterestList(entry, haEventWrapper, haContainer,
-         * owner.getName()); haEventWrapper.setClientUpdateMessage(null); newValue =
-         * CachedDeserializableFactory.create(original,
-         * ((CachedDeserializable)newValue).getSizeInBytes()); }
-         */
+      newValue = ((HARegion) owner).updateHAEventWrapper(sender, (CachedDeserializable) newValue);
+      if (newValue == null) {
+        return false;
       }
     }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
index 7246822..03c7b7e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
@@ -476,6 +476,10 @@ public class HARegion extends DistributedRegion {
     return ((HARegionAdvisor) this.distAdvisor).noPrimaryOrHasRegisteredInterest();
   }
 
+  public Object updateHAEventWrapper(InternalDistributedMember sender,
+      CachedDeserializable newValueCd) {
+    return this.owningQueue.updateHAEventWrapper(sender, newValueCd, getName());
+  }
 
   /** HARegions have their own advisors so that interest registration state can be tracked */
   public static class HARegionAdvisor extends CacheDistributionAdvisor {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 6d595c2..e69a8af 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -79,11 +79,13 @@ import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.DataSerializableFixedID;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.CachedDeserializable;
 import org.apache.geode.internal.cache.Conflatable;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.HARegion;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.VMCachedDeserializable;
 import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
 import org.apache.geode.internal.cache.tier.sockets.ClientMarkerMessageImpl;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
@@ -96,6 +98,7 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.util.BlobHelper;
 import org.apache.geode.internal.util.concurrent.StoppableCondition;
 import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
 import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
@@ -2070,6 +2073,84 @@ public class HARegionQueue implements RegionQueue {
     }
   }
 
+  public Object updateHAEventWrapper(InternalDistributedMember sender,
+      CachedDeserializable newValueCd, String regionName) {
+    Object inputValue;
+    try {
+      inputValue = BlobHelper.deserializeBlob(newValueCd.getSerializedValue(),
+          sender.getVersionObject(), null);
+      newValueCd = new VMCachedDeserializable(inputValue, newValueCd.getSizeInBytes());
+    } catch (IOException | ClassNotFoundException e) {
+      throw new RuntimeException("Unable to deserialize HA event for region " + regionName);
+    }
+    if (inputValue instanceof HAEventWrapper) {
+      HAEventWrapper inputHaEventWrapper = (HAEventWrapper) inputValue;
+      // Key was removed at sender side so not putting it into the HARegion
+      if (inputHaEventWrapper.getClientUpdateMessage() == null) {
+        return null;
+      }
+      // Getting the instance from singleton CCN..This assumes only one bridge
+      // server in the VM
+      HAContainerWrapper haContainer =
+          (HAContainerWrapper) CacheClientNotifier.getInstance().getHaContainer();
+      if (haContainer == null) {
+        return null;
+      }
+      HAEventWrapper entryHaEventWrapper = null;
+      // synchronized (haContainer) {
+      do {
+        ClientUpdateMessageImpl entryMessage = (ClientUpdateMessageImpl) haContainer
+            .putIfAbsent(inputHaEventWrapper, inputHaEventWrapper.getClientUpdateMessage());
+        if (entryMessage != null) {
+          entryHaEventWrapper = (HAEventWrapper) haContainer.getKey(inputHaEventWrapper);
+          if (entryHaEventWrapper == null) {
+            continue;
+          }
+          synchronized (entryHaEventWrapper) {
+            if (haContainer.getKey(entryHaEventWrapper) != null) {
+              entryHaEventWrapper.incAndGetReferenceCount();
+              // If the input and entry HAEventWrappers are not the same (which is the normal
+              // case), add the CQs and interest list from the input to the entry and create a new
+              // value from the entry.
+              if (entryHaEventWrapper != inputHaEventWrapper) { // See GEODE-4957
+                addClientCQsAndInterestList(entryMessage, inputHaEventWrapper, haContainer,
+                    regionName);
+                inputHaEventWrapper.setClientUpdateMessage(null);
+                newValueCd =
+                    new VMCachedDeserializable(entryHaEventWrapper, newValueCd.getSizeInBytes());
+              }
+            } else {
+              entryHaEventWrapper = null;
+            }
+          }
+        } else { // putIfAbsent successful
+          entryHaEventWrapper = (HAEventWrapper) haContainer.getKey(inputHaEventWrapper);
+          synchronized (entryHaEventWrapper) {
+            entryHaEventWrapper.incAndGetReferenceCount();
+            entryHaEventWrapper.setHAContainer(haContainer);
+            // If the input and entry HAEventWrappers are not the same (which is not the normal
+            // case), get the entry message, add the CQs and interest list from the input to the
+            // entry and create a new value from the entry.
+            if (entryHaEventWrapper != inputHaEventWrapper) { // See GEODE-4957
+              entryMessage = (ClientUpdateMessageImpl) haContainer.get(inputHaEventWrapper);
+              addClientCQsAndInterestList(entryMessage, inputHaEventWrapper, haContainer,
+                  regionName);
+              inputHaEventWrapper.setClientUpdateMessage(null);
+              newValueCd =
+                  new VMCachedDeserializable(entryHaEventWrapper, newValueCd.getSizeInBytes());
+            }
+            entryHaEventWrapper.setClientUpdateMessage(null);
+            entryHaEventWrapper.setIsRefFromHAContainer(true);
+          }
+          break;
+        }
+        // try until we either get a reference to HAEventWrapper from
+        // HAContainer or successfully put one into it.
+      } while (entryHaEventWrapper == null);
+    }
+    return newValueCd;
+  }
+
   /**
    * This is an implementation of RegionQueue where peek() & take () are blocking operation and will
    * not return unless it gets some legitimate value The Lock object used by this class is a
@@ -3440,7 +3521,7 @@ public class HARegionQueue implements RegionQueue {
     }
   }
 
-  public static void addClientCQsAndInterestList(ClientUpdateMessageImpl msg,
+  private void addClientCQsAndInterestList(ClientUpdateMessageImpl msg,
       HAEventWrapper haEventWrapper, Map haContainer, String regionName) {
 
     ClientProxyMembershipID proxyID = ((HAContainerWrapper) haContainer).getProxyID(regionName);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionDUnitTest.java
index 17ea933..0004539 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionDUnitTest.java
@@ -15,11 +15,15 @@
 package org.apache.geode.internal.cache.ha;
 
 import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.util.Properties;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.AdditionalAnswers;
 
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.CacheFactory;
@@ -213,7 +217,14 @@ public class HARegionDUnitTest extends JUnit4DistributedTestCase {
     AttributesFactory factory = new AttributesFactory();
     factory.setScope(Scope.DISTRIBUTED_ACK);
     factory.setDataPolicy(DataPolicy.REPLICATE);
-    HARegion.getInstance(REGION_NAME, (GemFireCacheImpl) cache, null, factory.create());
+
+    // Mock the HARegionQueue and answer the input CachedDeserializable when updateHAEventWrapper is
+    // called
+    HARegionQueue harq = mock(HARegionQueue.class);
+    when(harq.updateHAEventWrapper(any(), any(), any()))
+        .thenAnswer(AdditionalAnswers.returnsSecondArg());
+
+    HARegion.getInstance(REGION_NAME, (GemFireCacheImpl) cache, harq, factory.create());
   }
 
   private static HARegionQueue hrq = null;
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java
index 5fdd594..78db217 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java
@@ -15,6 +15,9 @@
 package org.apache.geode.internal.cache.ha;
 
 import static org.apache.geode.test.dunit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.util.Iterator;
 import java.util.List;
@@ -27,6 +30,7 @@ import org.awaitility.Awaitility;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.AdditionalAnswers;
 
 import org.apache.geode.LogWriter;
 import org.apache.geode.cache.AttributesFactory;
@@ -264,7 +268,14 @@ public class HARegionQueueDUnitTest extends JUnit4DistributedTestCase {
     AttributesFactory factory = new AttributesFactory();
     factory.setScope(Scope.DISTRIBUTED_ACK);
     factory.setDataPolicy(DataPolicy.REPLICATE);
-    HARegion.getInstance("HARegionQueueDUnitTest_region", (GemFireCacheImpl) cache, null,
+
+    // Mock the HARegionQueue and answer the input CachedDeserializable when updateHAEventWrapper is
+    // called
+    HARegionQueue harq = mock(HARegionQueue.class);
+    when(harq.updateHAEventWrapper(any(), any(), any()))
+        .thenAnswer(AdditionalAnswers.returnsSecondArg());
+
+    HARegion.getInstance("HARegionQueueDUnitTest_region", (GemFireCacheImpl) cache, harq,
         factory.create());
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
new file mode 100644
index 0000000..6ce921e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.ha;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.CachedDeserializable;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegionArguments;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.VMCachedDeserializable;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage;
+import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
+import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
+import org.apache.geode.internal.util.BlobHelper;
+import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore({"javax.script.*", "javax.management.*", "org.springframework.shell.event.*",
+    "org.springframework.shell.core.*", "*.IntegrationTest"})
+@PrepareForTest({CacheClientNotifier.class})
+@Category(IntegrationTest.class)
+public class HARegionQueueIntegrationTest {
+
+  private Cache cache;
+
+  private Region dataRegion;
+
+  private CacheClientNotifier ccn;
+
+  private InternalDistributedMember member;
+
+  private static final int NUM_QUEUES = 100;
+
+  @Before
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    cache = createCache();
+    dataRegion = createDataRegion();
+    ccn = createCacheClientNotifier();
+    member = createMember();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    cache.close();
+  }
+
+  private Cache createCache() {
+    return new CacheFactory().set(MCAST_PORT, "0").create();
+  }
+
+  private Region createDataRegion() {
+    return cache.createRegionFactory(RegionShortcut.REPLICATE).create("data");
+  }
+
+  private CacheClientNotifier createCacheClientNotifier() {
+    // Create a mock CacheClientNotifier
+    CacheClientNotifier ccn = mock(CacheClientNotifier.class);
+    PowerMockito.mockStatic(CacheClientNotifier.class, Mockito.CALLS_REAL_METHODS);
+    PowerMockito.when(CacheClientNotifier.getInstance()).thenReturn(ccn);
+    return ccn;
+  }
+
+  private InternalDistributedMember createMember() {
+    // Create an InternalDistributedMember
+    InternalDistributedMember member = mock(InternalDistributedMember.class);
+    when(member.getVersionObject()).thenReturn(Version.CURRENT);
+    return member;
+  }
+
+  @Test
+  public void verifySequentialUpdateHAEventWrapperWithMap() throws Exception {
+    // Create a HAContainerMap to be used by the CacheClientNotifier
+    HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+    when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+    // Create a CachedDeserializable
+    CachedDeserializable cd = createCachedDeserializable(haContainerWrapper);
+
+    // Create and update HARegionQueues
+    createAndUpdateHARegionQueuesSequentially(haContainerWrapper, cd, NUM_QUEUES);
+
+    // Verify HAContainerWrapper
+    verifyHAContainerWrapper(haContainerWrapper, cd, NUM_QUEUES);
+  }
+
+  @Test
+  public void verifySimultaneousUpdateHAEventWrapperWithMap() throws Exception {
+    // Create a HAContainerMap to be used by the CacheClientNotifier
+    HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+    when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+    // Create a CachedDeserializable
+    CachedDeserializable cd = createCachedDeserializable(haContainerWrapper);
+
+    // Create and update HARegionQueues
+    createAndUpdateHARegionQueuesSimultaneously(haContainerWrapper, cd, NUM_QUEUES);
+
+    // Verify HAContainerWrapper
+    verifyHAContainerWrapper(haContainerWrapper, cd, NUM_QUEUES);
+  }
+
+  @Test
+  public void verifySequentialUpdateHAEventWrapperWithRegion() throws Exception {
+    // Create a HAContainerRegion to be used by the CacheClientNotifier
+    HAContainerWrapper haContainerWrapper = createHAContainerRegion();
+    when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+    // Create a CachedDeserializable
+    CachedDeserializable cd = createCachedDeserializable(haContainerWrapper);
+
+    // Create and update HARegionQueues
+    createAndUpdateHARegionQueuesSequentially(haContainerWrapper, cd, NUM_QUEUES);
+
+    // Verify HAContainerWrapper
+    verifyHAContainerWrapper(haContainerWrapper, cd, NUM_QUEUES);
+  }
+
+  @Test
+  public void verifySimultaneousUpdateHAEventWrapperWithRegion() throws Exception {
+    // Create a HAContainerRegion to be used by the CacheClientNotifier
+    HAContainerWrapper haContainerWrapper = createHAContainerRegion();
+    when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+    // Create a CachedDeserializable
+    CachedDeserializable cd = createCachedDeserializable(haContainerWrapper);
+
+    // Create and update HARegionQueues
+    createAndUpdateHARegionQueuesSimultaneously(haContainerWrapper, cd, NUM_QUEUES);
+
+    // Verify HAContainerWrapper
+    verifyHAContainerWrapper(haContainerWrapper, cd, NUM_QUEUES);
+  }
+
+  private HAContainerRegion createHAContainerRegion() throws Exception {
+    // Create a Region to be used by the HAContainerRegion
+    Region haContainerRegionRegion = createHAContainerRegionRegion();
+
+    // Create an HAContainerRegion
+    HAContainerRegion haContainerRegion = new HAContainerRegion(haContainerRegionRegion);
+
+    return haContainerRegion;
+  }
+
+  private Region createHAContainerRegionRegion() throws Exception {
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.LOCAL);
+    factory.setDiskStoreName(null);
+    factory.setDiskSynchronous(true);
+    factory.setDataPolicy(DataPolicy.NORMAL);
+    factory.setStatisticsEnabled(true);
+    factory.setEvictionAttributes(
+        EvictionAttributes.createLIFOEntryAttributes(1000, EvictionAction.OVERFLOW_TO_DISK));
+    Region region = ((GemFireCacheImpl) cache).createVMRegion(
+        CacheServerImpl.generateNameForClientMsgsRegion(0), factory.create(),
+        new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
+            .setSnapshotInputStream(null).setImageTarget(null).setIsUsedForMetaRegion(true));
+    return region;
+  }
+
+  private HARegionQueue createHARegionQueue(Map haContainer, int index) throws Exception {
+    return new HARegionQueue("haRegion+" + index, mock(HARegion.class), (InternalCache) cache,
+        haContainer, null, (byte) 1, true, mock(HARegionQueueStats.class),
+        mock(StoppableReentrantReadWriteLock.class), mock(StoppableReentrantReadWriteLock.class),
+        mock(CancelCriterion.class), false);
+  }
+
+  private CachedDeserializable createCachedDeserializable(HAContainerWrapper haContainerWrapper)
+      throws Exception {
+    // Create ClientUpdateMessage and HAEventWrapper
+    ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+        (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+        new ClientProxyMembershipID(), new EventID(cache.getDistributedSystem()));
+    HAEventWrapper wrapper = new HAEventWrapper(message);
+    wrapper.setHAContainer(haContainerWrapper);
+
+    // Create a CachedDeserializable
+    // Note: The haContainerRegion must contain the wrapper and message to serialize it
+    haContainerWrapper.putIfAbsent(wrapper, message);
+    byte[] wrapperBytes = BlobHelper.serializeToBlob(wrapper);
+    CachedDeserializable cd = new VMCachedDeserializable(wrapperBytes);
+    haContainerWrapper.remove(wrapper);
+    assertThat(haContainerWrapper.size()).isEqualTo(0);
+    return cd;
+  }
+
+  private void createAndUpdateHARegionQueuesSequentially(HAContainerWrapper haContainerWrapper,
+      CachedDeserializable cd, int numQueues) throws Exception {
+    // Create some HARegionQueues
+    for (int i = 0; i < numQueues; i++) {
+      HARegionQueue haRegionQueue = createHARegionQueue(haContainerWrapper, i);
+      haRegionQueue.updateHAEventWrapper(member, cd, "haRegion");
+    }
+  }
+
+  private void createAndUpdateHARegionQueuesSimultaneously(HAContainerWrapper haContainerWrapper,
+      CachedDeserializable cd, int numQueues) throws Exception {
+    // Create some HARegionQueues
+    HARegionQueue[] haRegionQueues = new HARegionQueue[numQueues];
+    for (int i = 0; i < numQueues; i++) {
+      haRegionQueues[i] = createHARegionQueue(haContainerWrapper, i);
+    }
+
+    // Create threads to simultaneously update the HAEventWrapper
+    int j = 0;
+    Thread[] threads = new Thread[numQueues];
+    for (HARegionQueue haRegionQueue : haRegionQueues) {
+      threads[j] = new Thread(() -> {
+        haRegionQueue.updateHAEventWrapper(member, cd, "haRegion");
+      });
+      j++;
+    }
+
+    // Start the threads
+    for (int i = 0; i < numQueues; i++) {
+      threads[i].start();
+    }
+
+    // Wait for the threads to complete
+    for (int i = 0; i < numQueues; i++) {
+      threads[i].join();
+    }
+  }
+
+  private void verifyHAContainerWrapper(HAContainerWrapper haContainerWrapper,
+      CachedDeserializable cd, int numQueues) {
+    // Verify HAContainerRegion size
+    assertThat(haContainerWrapper.size()).isEqualTo(1);
+
+    // Verify the refCount is correct
+    HAEventWrapper wrapperInContainer =
+        (HAEventWrapper) haContainerWrapper.getKey(cd.getDeserializedForReading());
+    assertThat(wrapperInContainer.getReferenceCount()).isEqualTo(numQueues);
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HABug36738DUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HABug36738DUnitTest.java
index 0f544e9..c76dd4a 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HABug36738DUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HABug36738DUnitTest.java
@@ -18,9 +18,13 @@ import static org.apache.geode.test.dunit.Invoke.*;
 import static org.awaitility.Awaitility.*;
 import static org.awaitility.Duration.*;
 import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.AdditionalAnswers;
 
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.Cache;
@@ -35,6 +39,7 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.HARegion;
 import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.ha.HAHelper;
+import org.apache.geode.internal.cache.ha.HARegionQueue;
 import org.apache.geode.test.dunit.Host;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
@@ -113,7 +118,13 @@ public class HABug36738DUnitTest extends JUnit4DistributedTestCase {
     factory.setMirrorType(MirrorType.KEYS_VALUES);
     factory.setScope(Scope.DISTRIBUTED_ACK);
 
-    haRegion = HARegion.getInstance(HAREGION_NAME, (GemFireCacheImpl) cache, null,
+    // Mock the HARegionQueue and answer the input CachedDeserializable when updateHAEventWrapper is
+    // called
+    HARegionQueue harq = mock(HARegionQueue.class);
+    when(harq.updateHAEventWrapper(any(), any(), any()))
+        .thenAnswer(AdditionalAnswers.returnsSecondArg());
+
+    haRegion = HARegion.getInstance(HAREGION_NAME, (GemFireCacheImpl) cache, harq,
         factory.createRegionAttributes());
   }
 

-- 
To stop receiving notification emails like this one, please contact
boglesby@apache.org.