You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2018/04/24 17:26:01 UTC
[geode] branch develop updated: GEODE-4996: Addressed NPE by always
using the region entry key
This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new f6dd58f GEODE-4996: Addressed NPE by always using the region entry key
f6dd58f is described below
commit f6dd58f83528b32bb0fb1a44da9bb47ea29f5df3
Author: Barry Oglesby <bo...@users.noreply.github.com>
AuthorDate: Tue Apr 24 10:25:58 2018 -0700
GEODE-4996: Addressed NPE by always using the region entry key
---
.../geode/internal/cache/AbstractRegionMap.java | 77 +-----
.../org/apache/geode/internal/cache/HARegion.java | 4 +
.../geode/internal/cache/ha/HARegionQueue.java | 83 +++++-
.../geode/internal/cache/ha/HARegionDUnitTest.java | 13 +-
.../internal/cache/ha/HARegionQueueDUnitTest.java | 13 +-
.../cache/ha/HARegionQueueIntegrationTest.java | 286 +++++++++++++++++++++
.../cache/tier/sockets/HABug36738DUnitTest.java | 13 +-
7 files changed, 411 insertions(+), 78 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
index 4574c4d..5b1f309 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegionMap.java
@@ -14,7 +14,6 @@
*/
package org.apache.geode.internal.cache;
-import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@@ -54,17 +53,12 @@ import org.apache.geode.internal.cache.entries.DiskEntry;
import org.apache.geode.internal.cache.entries.OffHeapRegionEntry;
import org.apache.geode.internal.cache.eviction.EvictableEntry;
import org.apache.geode.internal.cache.eviction.EvictionController;
-import org.apache.geode.internal.cache.ha.HAContainerWrapper;
-import org.apache.geode.internal.cache.ha.HARegionQueue;
import org.apache.geode.internal.cache.map.CacheModificationLock;
import org.apache.geode.internal.cache.map.FocusedRegionMap;
import org.apache.geode.internal.cache.map.RegionMapDestroy;
import org.apache.geode.internal.cache.persistence.DiskRegionView;
import org.apache.geode.internal.cache.region.entry.RegionEntryFactoryBuilder;
-import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
-import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
-import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
import org.apache.geode.internal.cache.versions.RegionVersionVector;
import org.apache.geode.internal.cache.versions.VersionHolder;
@@ -86,7 +80,6 @@ import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.sequencelog.EntryLogger;
import org.apache.geode.internal.size.ReflectionSingleObjectSizer;
-import org.apache.geode.internal.util.BlobHelper;
import org.apache.geode.internal.util.concurrent.ConcurrentMapWithReusableEntries;
import org.apache.geode.internal.util.concurrent.CustomEntryConcurrentHashMap;
@@ -830,73 +823,9 @@ public abstract class AbstractRegionMap
}
if (owner instanceof HARegion && newValue instanceof CachedDeserializable) {
- Object actualVal = null;
- CachedDeserializable newValueCd = (CachedDeserializable) newValue;
- try {
- actualVal = BlobHelper.deserializeBlob(newValueCd.getSerializedValue(),
- sender.getVersionObject(), null);
- newValue = new VMCachedDeserializable(actualVal, newValueCd.getSizeInBytes());
- } catch (IOException | ClassNotFoundException e) {
- throw new RuntimeException("Unable to deserialize HA event for region " + owner);
- }
- if (actualVal instanceof HAEventWrapper) {
- HAEventWrapper haEventWrapper = (HAEventWrapper) actualVal;
- // Key was removed at sender side so not putting it into the HARegion
- if (haEventWrapper.getClientUpdateMessage() == null) {
- return false;
- }
- // Getting the instance from singleton CCN..This assumes only one bridge
- // server in the VM
- HAContainerWrapper haContainer =
- (HAContainerWrapper) CacheClientNotifier.getInstance().getHaContainer();
- if (haContainer == null) {
- return false;
- }
- HAEventWrapper original = null;
- // synchronized (haContainer) {
- do {
- ClientUpdateMessageImpl oldMsg = (ClientUpdateMessageImpl) haContainer
- .putIfAbsent(haEventWrapper, haEventWrapper.getClientUpdateMessage());
- if (oldMsg != null) {
- original = (HAEventWrapper) haContainer.getKey(haEventWrapper);
- if (original == null) {
- continue;
- }
- synchronized (original) {
- if ((HAEventWrapper) haContainer.getKey(original) != null) {
- original.incAndGetReferenceCount();
- HARegionQueue.addClientCQsAndInterestList(oldMsg, haEventWrapper, haContainer,
- owner.getName());
- haEventWrapper.setClientUpdateMessage(null);
- newValue = new VMCachedDeserializable(original, newValueCd.getSizeInBytes());
- } else {
- original = null;
- }
- }
- } else { // putIfAbsent successful
- synchronized (haEventWrapper) {
- haEventWrapper.incAndGetReferenceCount();
- haEventWrapper.setHAContainer(haContainer);
- haEventWrapper.setClientUpdateMessage(null);
- haEventWrapper.setIsRefFromHAContainer(true);
- }
- break;
- }
- // try until we either get a reference to HAEventWrapper from
- // HAContainer or successfully put one into it.
- } while (original == null);
- /*
- * entry = (Map.Entry)haContainer.getEntry(haEventWrapper); if (entry != null) { original =
- * (HAEventWrapper)entry.getKey(); original.incAndGetReferenceCount(); } else {
- * haEventWrapper.incAndGetReferenceCount(); haEventWrapper.setHAContainer(haContainer);
- * haContainer.put(haEventWrapper, haEventWrapper .getClientUpdateMessage());
- * haEventWrapper.setClientUpdateMessage(null);
- * haEventWrapper.setIsRefFromHAContainer(true); } } if (entry != null) {
- * HARegionQueue.addClientCQsAndInterestList(entry, haEventWrapper, haContainer,
- * owner.getName()); haEventWrapper.setClientUpdateMessage(null); newValue =
- * CachedDeserializableFactory.create(original,
- * ((CachedDeserializable)newValue).getSizeInBytes()); }
- */
+ newValue = ((HARegion) owner).updateHAEventWrapper(sender, (CachedDeserializable) newValue);
+ if (newValue == null) {
+ return false;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
index 7246822..03c7b7e 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
@@ -476,6 +476,10 @@ public class HARegion extends DistributedRegion {
return ((HARegionAdvisor) this.distAdvisor).noPrimaryOrHasRegisteredInterest();
}
+ public Object updateHAEventWrapper(InternalDistributedMember sender,
+ CachedDeserializable newValueCd) {
+ return this.owningQueue.updateHAEventWrapper(sender, newValueCd, getName());
+ }
/** HARegions have their own advisors so that interest registration state can be tracked */
public static class HARegionAdvisor extends CacheDistributionAdvisor {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
index 6d595c2..e69a8af 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/ha/HARegionQueue.java
@@ -79,11 +79,13 @@ import org.apache.geode.internal.Assert;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.Conflatable;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.HARegion;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.VMCachedDeserializable;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.ClientMarkerMessageImpl;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
@@ -96,6 +98,7 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.util.BlobHelper;
import org.apache.geode.internal.util.concurrent.StoppableCondition;
import org.apache.geode.internal.util.concurrent.StoppableReentrantLock;
import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
@@ -2070,6 +2073,84 @@ public class HARegionQueue implements RegionQueue {
}
}
+ public Object updateHAEventWrapper(InternalDistributedMember sender,
+ CachedDeserializable newValueCd, String regionName) {
+ Object inputValue;
+ try {
+ inputValue = BlobHelper.deserializeBlob(newValueCd.getSerializedValue(),
+ sender.getVersionObject(), null);
+ newValueCd = new VMCachedDeserializable(inputValue, newValueCd.getSizeInBytes());
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException("Unable to deserialize HA event for region " + regionName);
+ }
+ if (inputValue instanceof HAEventWrapper) {
+ HAEventWrapper inputHaEventWrapper = (HAEventWrapper) inputValue;
+ // Key was removed at sender side so not putting it into the HARegion
+ if (inputHaEventWrapper.getClientUpdateMessage() == null) {
+ return null;
+ }
+ // Getting the instance from singleton CCN..This assumes only one bridge
+ // server in the VM
+ HAContainerWrapper haContainer =
+ (HAContainerWrapper) CacheClientNotifier.getInstance().getHaContainer();
+ if (haContainer == null) {
+ return null;
+ }
+ HAEventWrapper entryHaEventWrapper = null;
+ // synchronized (haContainer) {
+ do {
+ ClientUpdateMessageImpl entryMessage = (ClientUpdateMessageImpl) haContainer
+ .putIfAbsent(inputHaEventWrapper, inputHaEventWrapper.getClientUpdateMessage());
+ if (entryMessage != null) {
+ entryHaEventWrapper = (HAEventWrapper) haContainer.getKey(inputHaEventWrapper);
+ if (entryHaEventWrapper == null) {
+ continue;
+ }
+ synchronized (entryHaEventWrapper) {
+ if (haContainer.getKey(entryHaEventWrapper) != null) {
+ entryHaEventWrapper.incAndGetReferenceCount();
+ // If the input and entry HAEventWrappers are not the same (which is the normal
+ // case), add the CQs and interest list from the input to the entry and create a new
+ // value from the entry.
+ if (entryHaEventWrapper != inputHaEventWrapper) { // See GEODE-4957
+ addClientCQsAndInterestList(entryMessage, inputHaEventWrapper, haContainer,
+ regionName);
+ inputHaEventWrapper.setClientUpdateMessage(null);
+ newValueCd =
+ new VMCachedDeserializable(entryHaEventWrapper, newValueCd.getSizeInBytes());
+ }
+ } else {
+ entryHaEventWrapper = null;
+ }
+ }
+ } else { // putIfAbsent successful
+ entryHaEventWrapper = (HAEventWrapper) haContainer.getKey(inputHaEventWrapper);
+ synchronized (entryHaEventWrapper) {
+ entryHaEventWrapper.incAndGetReferenceCount();
+ entryHaEventWrapper.setHAContainer(haContainer);
+ // If the input and entry HAEventWrappers are not the same (which is not the normal
+ // case), get the entry message, add the CQs and interest list from the input to the
+ // entry and create a new value from the entry.
+ if (entryHaEventWrapper != inputHaEventWrapper) { // See GEODE-4957
+ entryMessage = (ClientUpdateMessageImpl) haContainer.get(inputHaEventWrapper);
+ addClientCQsAndInterestList(entryMessage, inputHaEventWrapper, haContainer,
+ regionName);
+ inputHaEventWrapper.setClientUpdateMessage(null);
+ newValueCd =
+ new VMCachedDeserializable(entryHaEventWrapper, newValueCd.getSizeInBytes());
+ }
+ entryHaEventWrapper.setClientUpdateMessage(null);
+ entryHaEventWrapper.setIsRefFromHAContainer(true);
+ }
+ break;
+ }
+ // try until we either get a reference to HAEventWrapper from
+ // HAContainer or successfully put one into it.
+ } while (entryHaEventWrapper == null);
+ }
+ return newValueCd;
+ }
+
/**
* This is an implementation of RegionQueue where peek() & take () are blocking operation and will
* not return unless it gets some legitimate value The Lock object used by this class is a
@@ -3440,7 +3521,7 @@ public class HARegionQueue implements RegionQueue {
}
}
- public static void addClientCQsAndInterestList(ClientUpdateMessageImpl msg,
+ private void addClientCQsAndInterestList(ClientUpdateMessageImpl msg,
HAEventWrapper haEventWrapper, Map haContainer, String regionName) {
ClientProxyMembershipID proxyID = ((HAContainerWrapper) haContainer).getProxyID(regionName);
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionDUnitTest.java
index 17ea933..0004539 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionDUnitTest.java
@@ -15,11 +15,15 @@
package org.apache.geode.internal.cache.ha;
import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.Properties;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.AdditionalAnswers;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.CacheFactory;
@@ -213,7 +217,14 @@ public class HARegionDUnitTest extends JUnit4DistributedTestCase {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
- HARegion.getInstance(REGION_NAME, (GemFireCacheImpl) cache, null, factory.create());
+
+ // Mock the HARegionQueue and answer the input CachedDeserializable when updateHAEventWrapper is
+ // called
+ HARegionQueue harq = mock(HARegionQueue.class);
+ when(harq.updateHAEventWrapper(any(), any(), any()))
+ .thenAnswer(AdditionalAnswers.returnsSecondArg());
+
+ HARegion.getInstance(REGION_NAME, (GemFireCacheImpl) cache, harq, factory.create());
}
private static HARegionQueue hrq = null;
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java
index 5fdd594..78db217 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueDUnitTest.java
@@ -15,6 +15,9 @@
package org.apache.geode.internal.cache.ha;
import static org.apache.geode.test.dunit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.Iterator;
import java.util.List;
@@ -27,6 +30,7 @@ import org.awaitility.Awaitility;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.AdditionalAnswers;
import org.apache.geode.LogWriter;
import org.apache.geode.cache.AttributesFactory;
@@ -264,7 +268,14 @@ public class HARegionQueueDUnitTest extends JUnit4DistributedTestCase {
AttributesFactory factory = new AttributesFactory();
factory.setScope(Scope.DISTRIBUTED_ACK);
factory.setDataPolicy(DataPolicy.REPLICATE);
- HARegion.getInstance("HARegionQueueDUnitTest_region", (GemFireCacheImpl) cache, null,
+
+ // Mock the HARegionQueue and answer the input CachedDeserializable when updateHAEventWrapper is
+ // called
+ HARegionQueue harq = mock(HARegionQueue.class);
+ when(harq.updateHAEventWrapper(any(), any(), any()))
+ .thenAnswer(AdditionalAnswers.returnsSecondArg());
+
+ HARegion.getInstance("HARegionQueueDUnitTest_region", (GemFireCacheImpl) cache, harq,
factory.create());
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
new file mode 100644
index 0000000..6ce921e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueIntegrationTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.ha;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.Version;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.CachedDeserializable;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.EventID;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.HARegion;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegionArguments;
+import org.apache.geode.internal.cache.LocalRegion;
+import org.apache.geode.internal.cache.VMCachedDeserializable;
+import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
+import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessage;
+import org.apache.geode.internal.cache.tier.sockets.ClientUpdateMessageImpl;
+import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
+import org.apache.geode.internal.util.BlobHelper;
+import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@RunWith(PowerMockRunner.class)
+@PowerMockIgnore({"javax.script.*", "javax.management.*", "org.springframework.shell.event.*",
+ "org.springframework.shell.core.*", "*.IntegrationTest"})
+@PrepareForTest({CacheClientNotifier.class})
+@Category(IntegrationTest.class)
+public class HARegionQueueIntegrationTest {
+
+ private Cache cache;
+
+ private Region dataRegion;
+
+ private CacheClientNotifier ccn;
+
+ private InternalDistributedMember member;
+
+ private static final int NUM_QUEUES = 100;
+
+ @Before
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ cache = createCache();
+ dataRegion = createDataRegion();
+ ccn = createCacheClientNotifier();
+ member = createMember();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ cache.close();
+ }
+
+ private Cache createCache() {
+ return new CacheFactory().set(MCAST_PORT, "0").create();
+ }
+
+ private Region createDataRegion() {
+ return cache.createRegionFactory(RegionShortcut.REPLICATE).create("data");
+ }
+
+ private CacheClientNotifier createCacheClientNotifier() {
+ // Create a mock CacheClientNotifier
+ CacheClientNotifier ccn = mock(CacheClientNotifier.class);
+ PowerMockito.mockStatic(CacheClientNotifier.class, Mockito.CALLS_REAL_METHODS);
+ PowerMockito.when(CacheClientNotifier.getInstance()).thenReturn(ccn);
+ return ccn;
+ }
+
+ private InternalDistributedMember createMember() {
+ // Create an InternalDistributedMember
+ InternalDistributedMember member = mock(InternalDistributedMember.class);
+ when(member.getVersionObject()).thenReturn(Version.CURRENT);
+ return member;
+ }
+
+ @Test
+ public void verifySequentialUpdateHAEventWrapperWithMap() throws Exception {
+ // Create a HAContainerMap to be used by the CacheClientNotifier
+ HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+ when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+ // Create a CachedDeserializable
+ CachedDeserializable cd = createCachedDeserializable(haContainerWrapper);
+
+ // Create and update HARegionQueues
+ createAndUpdateHARegionQueuesSequentially(haContainerWrapper, cd, NUM_QUEUES);
+
+ // Verify HAContainerWrapper
+ verifyHAContainerWrapper(haContainerWrapper, cd, NUM_QUEUES);
+ }
+
+ @Test
+ public void verifySimultaneousUpdateHAEventWrapperWithMap() throws Exception {
+ // Create a HAContainerMap to be used by the CacheClientNotifier
+ HAContainerWrapper haContainerWrapper = new HAContainerMap(new ConcurrentHashMap());
+ when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+ // Create a CachedDeserializable
+ CachedDeserializable cd = createCachedDeserializable(haContainerWrapper);
+
+ // Create and update HARegionQueues
+ createAndUpdateHARegionQueuesSimultaneously(haContainerWrapper, cd, NUM_QUEUES);
+
+ // Verify HAContainerWrapper
+ verifyHAContainerWrapper(haContainerWrapper, cd, NUM_QUEUES);
+ }
+
+ @Test
+ public void verifySequentialUpdateHAEventWrapperWithRegion() throws Exception {
+ // Create a HAContainerRegion to be used by the CacheClientNotifier
+ HAContainerWrapper haContainerWrapper = createHAContainerRegion();
+ when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+ // Create a CachedDeserializable
+ CachedDeserializable cd = createCachedDeserializable(haContainerWrapper);
+
+ // Create and update HARegionQueues
+ createAndUpdateHARegionQueuesSequentially(haContainerWrapper, cd, NUM_QUEUES);
+
+ // Verify HAContainerWrapper
+ verifyHAContainerWrapper(haContainerWrapper, cd, NUM_QUEUES);
+ }
+
+ @Test
+ public void verifySimultaneousUpdateHAEventWrapperWithRegion() throws Exception {
+ // Create a HAContainerRegion to be used by the CacheClientNotifier
+ HAContainerWrapper haContainerWrapper = createHAContainerRegion();
+ when(ccn.getHaContainer()).thenReturn(haContainerWrapper);
+
+ // Create a CachedDeserializable
+ CachedDeserializable cd = createCachedDeserializable(haContainerWrapper);
+
+ // Create and update HARegionQueues
+ createAndUpdateHARegionQueuesSimultaneously(haContainerWrapper, cd, NUM_QUEUES);
+
+ // Verify HAContainerWrapper
+ verifyHAContainerWrapper(haContainerWrapper, cd, NUM_QUEUES);
+ }
+
+ private HAContainerRegion createHAContainerRegion() throws Exception {
+ // Create a Region to be used by the HAContainerRegion
+ Region haContainerRegionRegion = createHAContainerRegionRegion();
+
+ // Create an HAContainerRegion
+ HAContainerRegion haContainerRegion = new HAContainerRegion(haContainerRegionRegion);
+
+ return haContainerRegion;
+ }
+
+ private Region createHAContainerRegionRegion() throws Exception {
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.LOCAL);
+ factory.setDiskStoreName(null);
+ factory.setDiskSynchronous(true);
+ factory.setDataPolicy(DataPolicy.NORMAL);
+ factory.setStatisticsEnabled(true);
+ factory.setEvictionAttributes(
+ EvictionAttributes.createLIFOEntryAttributes(1000, EvictionAction.OVERFLOW_TO_DISK));
+ Region region = ((GemFireCacheImpl) cache).createVMRegion(
+ CacheServerImpl.generateNameForClientMsgsRegion(0), factory.create(),
+ new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false)
+ .setSnapshotInputStream(null).setImageTarget(null).setIsUsedForMetaRegion(true));
+ return region;
+ }
+
+ private HARegionQueue createHARegionQueue(Map haContainer, int index) throws Exception {
+ return new HARegionQueue("haRegion+" + index, mock(HARegion.class), (InternalCache) cache,
+ haContainer, null, (byte) 1, true, mock(HARegionQueueStats.class),
+ mock(StoppableReentrantReadWriteLock.class), mock(StoppableReentrantReadWriteLock.class),
+ mock(CancelCriterion.class), false);
+ }
+
+ private CachedDeserializable createCachedDeserializable(HAContainerWrapper haContainerWrapper)
+ throws Exception {
+ // Create ClientUpdateMessage and HAEventWrapper
+ ClientUpdateMessage message = new ClientUpdateMessageImpl(EnumListenerEvent.AFTER_UPDATE,
+ (LocalRegion) dataRegion, "key", "value".getBytes(), (byte) 0x01, null,
+ new ClientProxyMembershipID(), new EventID(cache.getDistributedSystem()));
+ HAEventWrapper wrapper = new HAEventWrapper(message);
+ wrapper.setHAContainer(haContainerWrapper);
+
+ // Create a CachedDeserializable
+ // Note: The haContainerRegion must contain the wrapper and message to serialize it
+ haContainerWrapper.putIfAbsent(wrapper, message);
+ byte[] wrapperBytes = BlobHelper.serializeToBlob(wrapper);
+ CachedDeserializable cd = new VMCachedDeserializable(wrapperBytes);
+ haContainerWrapper.remove(wrapper);
+ assertThat(haContainerWrapper.size()).isEqualTo(0);
+ return cd;
+ }
+
+ private void createAndUpdateHARegionQueuesSequentially(HAContainerWrapper haContainerWrapper,
+ CachedDeserializable cd, int numQueues) throws Exception {
+ // Create some HARegionQueues
+ for (int i = 0; i < numQueues; i++) {
+ HARegionQueue haRegionQueue = createHARegionQueue(haContainerWrapper, i);
+ haRegionQueue.updateHAEventWrapper(member, cd, "haRegion");
+ }
+ }
+
+ private void createAndUpdateHARegionQueuesSimultaneously(HAContainerWrapper haContainerWrapper,
+ CachedDeserializable cd, int numQueues) throws Exception {
+ // Create some HARegionQueues
+ HARegionQueue[] haRegionQueues = new HARegionQueue[numQueues];
+ for (int i = 0; i < numQueues; i++) {
+ haRegionQueues[i] = createHARegionQueue(haContainerWrapper, i);
+ }
+
+ // Create threads to simultaneously update the HAEventWrapper
+ int j = 0;
+ Thread[] threads = new Thread[numQueues];
+ for (HARegionQueue haRegionQueue : haRegionQueues) {
+ threads[j] = new Thread(() -> {
+ haRegionQueue.updateHAEventWrapper(member, cd, "haRegion");
+ });
+ j++;
+ }
+
+ // Start the threads
+ for (int i = 0; i < numQueues; i++) {
+ threads[i].start();
+ }
+
+ // Wait for the threads to complete
+ for (int i = 0; i < numQueues; i++) {
+ threads[i].join();
+ }
+ }
+
+ private void verifyHAContainerWrapper(HAContainerWrapper haContainerWrapper,
+ CachedDeserializable cd, int numQueues) {
+ // Verify HAContainerRegion size
+ assertThat(haContainerWrapper.size()).isEqualTo(1);
+
+ // Verify the refCount is correct
+ HAEventWrapper wrapperInContainer =
+ (HAEventWrapper) haContainerWrapper.getKey(cd.getDeserializedForReading());
+ assertThat(wrapperInContainer.getReferenceCount()).isEqualTo(numQueues);
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HABug36738DUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HABug36738DUnitTest.java
index 0f544e9..c76dd4a 100755
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HABug36738DUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/HABug36738DUnitTest.java
@@ -18,9 +18,13 @@ import static org.apache.geode.test.dunit.Invoke.*;
import static org.awaitility.Awaitility.*;
import static org.awaitility.Duration.*;
import static org.junit.Assert.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.AdditionalAnswers;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.Cache;
@@ -35,6 +39,7 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.HARegion;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.ha.HAHelper;
+import org.apache.geode.internal.cache.ha.HARegionQueue;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
@@ -113,7 +118,13 @@ public class HABug36738DUnitTest extends JUnit4DistributedTestCase {
factory.setMirrorType(MirrorType.KEYS_VALUES);
factory.setScope(Scope.DISTRIBUTED_ACK);
- haRegion = HARegion.getInstance(HAREGION_NAME, (GemFireCacheImpl) cache, null,
+ // Mock the HARegionQueue and answer the input CachedDeserializable when updateHAEventWrapper is
+ // called
+ HARegionQueue harq = mock(HARegionQueue.class);
+ when(harq.updateHAEventWrapper(any(), any(), any()))
+ .thenAnswer(AdditionalAnswers.returnsSecondArg());
+
+ haRegion = HARegion.getInstance(HAREGION_NAME, (GemFireCacheImpl) cache, harq,
factory.createRegionAttributes());
}
--
To stop receiving notification emails like this one, please contact
boglesby@apache.org.