You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2020/10/06 18:53:03 UTC
[geode] 01/02: GEODE-8432: use regionPath directly instead of getRegion when put eve… (#5464)
This is an automated email from the ASF dual-hosted git repository.
zhouxj pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git
commit ee4ff0e4660c5d5a26a6d5f31cfa3a83cbde5024
Author: Xiaojian Zhou <ge...@users.noreply.github.com>
AuthorDate: Wed Aug 19 10:05:51 2020 -0700
GEODE-8432: use regionPath directly instead of getRegion when put eve… (#5464)
(cherry picked from commit 6f12a360d82f0de9259557af2bca34cd84e4b5f4)
---
.../wan/parallel/ParallelGatewaySenderQueue.java | 10 ++---
.../ParallelGatewaySenderQueueJUnitTest.java | 49 ++++++++++++++++++++++
2 files changed, 53 insertions(+), 6 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index baac4d4..c7f7095 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -663,15 +663,13 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
boolean isDREvent = isDREvent(sender.getCache(), value);
- Region region = value.getRegion();
- String regionPath = null;
- if (isDREvent) {
- regionPath = region.getFullPath();
- } else {
+ String regionPath = value.getRegionPath();
+ if (!isDREvent) {
+ Region region = sender.getCache().getRegion(regionPath, true);
regionPath = ColocationHelper.getLeaderRegion((PartitionedRegion) region).getFullPath();
}
if (isDebugEnabled) {
- logger.debug("Put is for the region {}", region);
+ logger.debug("Put is for the region {}", regionPath);
}
if (!this.userRegionNameToShadowPRMap.containsKey(regionPath)) {
if (isDebugEnabled) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
index 88c3e45..db2c4b4 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
@@ -17,7 +17,9 @@ package org.apache.geode.internal.cache.wan.parallel;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -36,6 +38,7 @@ import org.mockito.stubbing.Answer;
import org.apache.geode.CancelCriterion;
import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.PartitionAttributes;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
@@ -71,6 +74,52 @@ public class ParallelGatewaySenderQueueJUnitTest {
}
@Test
+ public void whenReplicatedDataRegionNotReadyShouldNotThrowException() throws Exception {
+ GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+ when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
+ when(event.getRegion()).thenReturn(null);
+ String regionPath = "/testRegion";
+ when(event.getRegionPath()).thenReturn(regionPath);
+ Mockito.doThrow(new IllegalStateException()).when(event).release();
+ Queue backingList = new LinkedList();
+ backingList.add(event);
+
+ queue = spy(queue);
+ doReturn(true).when(queue).isDREvent(any(), any());
+ boolean putDone = queue.put(event);
+ assertThat(putDone).isFalse();
+ }
+
+ @Test
+ public void whenPartitionedDataRegionNotReadyShouldNotThrowException() throws Exception {
+ GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+ when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
+ when(event.getRegion()).thenReturn(null);
+ String regionPath = "/testRegion";
+ when(event.getRegionPath()).thenReturn(regionPath);
+ PartitionedRegion region = mock(PartitionedRegion.class);
+ when(region.getFullPath()).thenReturn(regionPath);
+ when(cache.getRegion(regionPath, true)).thenReturn(region);
+ PartitionAttributes pa = mock(PartitionAttributes.class);
+ when(region.getPartitionAttributes()).thenReturn(pa);
+ when(pa.getColocatedWith()).thenReturn(null);
+
+ Mockito.doThrow(new IllegalStateException()).when(event).release();
+ Queue backingList = new LinkedList();
+ backingList.add(event);
+
+ BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
+
+ TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender,
+ Collections.emptySet(), 0, 1, metaRegionFactory);
+ queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
+
+ queue = spy(queue);
+ boolean putDone = queue.put(event);
+ assertThat(putDone).isFalse();
+ }
+
+ @Test
public void whenEventReleaseFromOffHeapFailsExceptionShouldNotBeThrownToAckReaderThread()
throws Exception {
GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);