You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mk...@apache.org on 2021/03/29 06:29:26 UTC

[geode] branch develop updated: GEODE-8918: fix replication in multisite systems (#6008)

This is an automated email from the ASF dual-hosted git repository.

mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 73a2120  GEODE-8918: fix replication in multisite systems (#6008)
73a2120 is described below

commit 73a2120d9c6bca1cd62580414429a40022fe64ca
Author: Mario Kevo <48...@users.noreply.github.com>
AuthorDate: Mon Mar 29 08:28:31 2021 +0200

    GEODE-8918: fix replication in multisite systems (#6008)
    
    * GEODE-8918: fix replication in multisite systems
---
 .../geode/internal/cache/AbstractRegion.java       |  8 ++-
 .../geode/internal/cache/AbstractRegionTest.java   | 39 ++++++++++++
 .../ParallelGatewaySenderOperationsDUnitTest.java  | 73 ++++++++++++++++++++++
 3 files changed, 118 insertions(+), 2 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java
index 8e919fc..3f09662 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java
@@ -741,8 +741,8 @@ public abstract class AbstractRegion implements InternalRegion, AttributesMutato
         // This is for all regions except pdx Region
         if (!isPdxTypesRegion) {
           // Make sure we are distributing to only those senders whose id
-          // is available on this region
-          if (allGatewaySenderIds.contains(sender.getId())) {
+          // is available on this region and whose state is running
+          if (hasRunningGatewaySender(allGatewaySenders, sender)) {
             allRemoteDSIds.add(sender.getRemoteDSId());
           }
         } else { // this else is for PDX region
@@ -1897,6 +1897,10 @@ public abstract class AbstractRegion implements InternalRegion, AttributesMutato
     PoolImpl find(String poolName);
   }
 
+  static boolean hasRunningGatewaySender(Set<GatewaySender> senders, GatewaySender sender) {
+    return senders.contains(sender) && sender.isRunning();
+  }
+
   @VisibleForTesting
   ConcurrentHashMap<RegionEntry, EntryExpiryTask> getEntryExpiryTasks() {
     return entryExpiryTasks;
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionTest.java
index 42b5b0b..48c5037 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionTest.java
@@ -18,8 +18,12 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.util.Collections;
+import java.util.Set;
+
 import org.junit.Test;
 
+import org.apache.geode.cache.wan.GatewaySender;
 
 public class AbstractRegionTest {
 
@@ -34,4 +38,39 @@ public class AbstractRegionTest {
     assertThat(mockAbstractRegion.isAllEvents()).isTrue();
     assertThat(mockAbstractRegion.cacheTimeMillis()).isEqualTo(millis);
   }
+
+  @Test
+  public void hasRunningGatewaySender_returnsFalse_ifSendersIsEmpty() {
+    GatewaySender sender = mock(GatewaySender.class);
+
+    boolean value = AbstractRegion.hasRunningGatewaySender(Collections.emptySet(), sender);
+
+    assertThat(value).isFalse();
+  }
+
+  @Test
+  public void hasRunningGatewaySender_returnsFalse_ifSenderIsStopped() {
+    GatewaySender mockSender = mock(GatewaySender.class);
+    Set<GatewaySender> senders = (Set<GatewaySender>) mock(Set.class);
+
+    when(senders.contains(mockSender)).thenReturn(true);
+    when(mockSender.isRunning()).thenReturn(false);
+
+    boolean value = AbstractRegion.hasRunningGatewaySender(senders, mockSender);
+
+    assertThat(value).isFalse();
+  }
+
+  @Test
+  public void hasRunningGatewaySender_returnsTrue_ifSenderIsRunning() {
+    GatewaySender mockSender = mock(GatewaySender.class);
+    Set<GatewaySender> senders = (Set<GatewaySender>) mock(Set.class);
+
+    when(senders.contains(mockSender)).thenReturn(true);
+    when(mockSender.isRunning()).thenReturn(true);
+
+    boolean value = AbstractRegion.hasRunningGatewaySender(senders, mockSender);
+
+    assertThat(value).isTrue();
+  }
 }
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 2842e84..3a8a6a1 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -800,6 +800,79 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
   }
 
   @Test
+  public void testMultiSiteReplication() {
+    String site1to2SenderId = "site1to2-sender";
+    String site1to3SenderId = "site1to3-sender";
+    String site2to3SenderId = "site2to3-sender";
+    String regionName = testName.getMethodName();
+    int[] ports = getRandomAvailableTCPPorts(3);
+    int site1Port = ports[0];
+    int site2Port = ports[1];
+    int site3Port = ports[2];
+    Set<String> site1RemoteLocators =
+        Stream.of("localhost[" + site2Port + "]", "localhost[" + site3Port + "]")
+            .collect(Collectors.toSet());
+    Set<String> site2RemoteLocators =
+        Stream.of("localhost[" + site1Port + "]", "localhost[" + site3Port + "]")
+            .collect(Collectors.toSet());
+    Set<String> site3RemoteLocators =
+        Stream.of("localhost[" + site1Port + "]", "localhost[" + site2Port + "]")
+            .collect(Collectors.toSet());
+
+    // Start 3 sites.
+    vm0.invoke(() -> createLocator(1, site1Port,
+        Collections.singleton("localhost[" + site1Port + "]"), site1RemoteLocators));
+    vm1.invoke(() -> createLocator(2, site2Port,
+        Collections.singleton("localhost[" + site2Port + "]"), site2RemoteLocators));
+    vm2.invoke(() -> createLocator(3, site3Port,
+        Collections.singleton("localhost[" + site3Port + "]"), site3RemoteLocators));
+
+    // Create the cache on the 3 sites.
+    createCacheInVMs(site1Port, vm3);
+    createCacheInVMs(site2Port, vm4);
+    createCacheInVMs(site3Port, vm5);
+
+    // Create senders and partitioned region on site 1.
+    vm3.invoke(() -> {
+      createSender(site1to2SenderId, 2, true, 100, 20, false, false, null, false);
+      createSender(site1to3SenderId, 3, true, 100, 20, false, false, null, false);
+      waitForSenderRunningState(site1to2SenderId);
+      waitForSenderRunningState(site1to3SenderId);
+
+      createPartitionedRegion(regionName, String.join(",", site1to2SenderId, site1to3SenderId), 1,
+          113,
+          isOffHeap());
+    });
+
+    // Create receiver, sender and partitioned region on site 2.
+    vm4.invoke(() -> {
+      createReceiver();
+      createSender(site2to3SenderId, 3, true, 100, 20, false, false, null, false);
+      waitForSenderRunningState(site2to3SenderId);
+
+      createPartitionedRegion(regionName, String.join(",", site2to3SenderId), 1, 113,
+          isOffHeap());
+    });
+
+    // Create receiver and partitioned region on site 3.
+    vm5.invoke(() -> {
+      createReceiver();
+      createPartitionedRegion(regionName, null, 1, 113, isOffHeap());
+    });
+
+    // Do puts
+    vm3.invoke(() -> doPuts(regionName, 200));
+    validateRegionSizes(regionName, 200, vm3, vm4, vm5);
+
+    // Stop sender from site 1 to site 3.
+    vm3.invoke(() -> stopSender(site1to3SenderId));
+
+    // Do puts again
+    vm3.invoke(() -> doPuts(regionName, 1000));
+    validateRegionSizes(regionName, 1000, vm3, vm4, vm5);
+  }
+
+  @Test
   public void testParallelGatewaySenderConcurrentPutClearNoOffheapOrphans()
       throws Exception {
     MemberVM locator = clusterStartupRule.startLocatorVM(1, new Properties());