You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by zh...@apache.org on 2020/08/18 17:49:11 UTC

[geode] branch feature/GEODE-8432 created (now 39ea51e)

This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a change to branch feature/GEODE-8432
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 39ea51e  GEODE-8432: use regionPath directly instead of getRegion when put event into parallelGatewaySenderQueue

This branch includes the following new commits:

     new 39ea51e  GEODE-8432: use regionPath directly instead of getRegion when put event into parallelGatewaySenderQueue

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-8432: use regionPath directly instead of getRegion when put event into parallelGatewaySenderQueue

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhouxj pushed a commit to branch feature/GEODE-8432
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 39ea51eb55f7833ca5a2b867f2acb331895ca1ec
Author: zhouxh <gz...@pivotal.io>
AuthorDate: Tue Aug 18 10:47:35 2020 -0700

    GEODE-8432: use regionPath directly instead of getRegion when put event into parallelGatewaySenderQueue
---
 .../wan/parallel/ParallelGatewaySenderQueue.java   | 16 ++++---
 .../ParallelGatewaySenderQueueJUnitTest.java       | 49 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 6 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index df531ce..4e3c9db 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -693,15 +693,19 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
 
     boolean isDREvent = isDREvent(sender.getCache(), value);
 
-    Region region = value.getRegion();
-    String regionPath = null;
-    if (isDREvent) {
-      regionPath = region.getFullPath();
-    } else {
+    String regionPath = value.getRegionPath();
+    if (!isDREvent) {
+      Region region = sender.getCache().getRegion(regionPath);
+      if (region == null) {
+        if (isDebugEnabled) {
+          logger.debug("The PR " + regionPath + " has not finished initializing.");
+        }
+        region = value.getRegion();
+      }
       regionPath = ColocationHelper.getLeaderRegion((PartitionedRegion) region).getFullPath();
     }
     if (isDebugEnabled) {
-      logger.debug("Put is for the region {}", region);
+      logger.debug("Put is for the region {}", regionPath);
     }
     if (!this.userRegionNameToShadowPRMap.containsKey(regionPath)) {
       if (isDebugEnabled) {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
index b8acbcf..7f51b7b 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
@@ -17,7 +17,9 @@ package org.apache.geode.internal.cache.wan.parallel;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -37,6 +39,7 @@ import org.mockito.stubbing.Answer;
 
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.PartitionAttributes;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
@@ -76,6 +79,52 @@ public class ParallelGatewaySenderQueueJUnitTest {
   }
 
   @Test
+  public void whenReplicatedDataRegionNotReadyShouldNotThrowException() throws Exception {
+    GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+    when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
+    when(event.getRegion()).thenReturn(null);
+    String regionPath = "/testRegion";
+    when(event.getRegionPath()).thenReturn(regionPath);
+    Mockito.doThrow(new IllegalStateException()).when(event).release();
+    Queue backingList = new LinkedList();
+    backingList.add(event);
+
+    queue = spy(queue);
+    doReturn(true).when(queue).isDREvent(any(), any());
+    boolean putDone = queue.put(event);
+    assertThat(putDone).isFalse();
+  }
+
+  @Test
+  public void whenPartitionedDataRegionNotReadyShouldNotThrowException() throws Exception {
+    GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+    when(event.makeHeapCopyIfOffHeap()).thenReturn(event);
+    when(event.getRegion()).thenReturn(null);
+    String regionPath = "/testRegion";
+    when(event.getRegionPath()).thenReturn(regionPath);
+    PartitionedRegion region = mock(PartitionedRegion.class);
+    when(region.getFullPath()).thenReturn(regionPath);
+    when(cache.getRegion(regionPath)).thenReturn(region);
+    PartitionAttributes pa = mock(PartitionAttributes.class);
+    when(region.getPartitionAttributes()).thenReturn(pa);
+    when(pa.getColocatedWith()).thenReturn(null);
+
+    Mockito.doThrow(new IllegalStateException()).when(event).release();
+    Queue backingList = new LinkedList();
+    backingList.add(event);
+
+    BucketRegionQueue bucketRegionQueue = mockBucketRegionQueue(backingList);
+
+    TestableParallelGatewaySenderQueue queue = new TestableParallelGatewaySenderQueue(sender,
+        Collections.emptySet(), 0, 1, metaRegionFactory);
+    queue.setMockedAbstractBucketRegionQueue(bucketRegionQueue);
+
+    queue = spy(queue);
+    boolean putDone = queue.put(event);
+    assertThat(putDone).isFalse();
+  }
+
+  @Test
   public void whenEventReleaseFromOffHeapFailsExceptionShouldNotBeThrownToAckReaderThread()
       throws Exception {
     GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);