You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mk...@apache.org on 2021/03/29 06:29:26 UTC
[geode] branch develop updated: GEODE-8918: fix replication in
multisite systems (#6008)
This is an automated email from the ASF dual-hosted git repository.
mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 73a2120 GEODE-8918: fix replication in multisite systems (#6008)
73a2120 is described below
commit 73a2120d9c6bca1cd62580414429a40022fe64ca
Author: Mario Kevo <48...@users.noreply.github.com>
AuthorDate: Mon Mar 29 08:28:31 2021 +0200
GEODE-8918: fix replication in multisite systems (#6008)
* GEODE-8918: fix replication in multisite systems
---
.../geode/internal/cache/AbstractRegion.java | 8 ++-
.../geode/internal/cache/AbstractRegionTest.java | 39 ++++++++++++
.../ParallelGatewaySenderOperationsDUnitTest.java | 73 ++++++++++++++++++++++
3 files changed, 118 insertions(+), 2 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java
index 8e919fc..3f09662 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractRegion.java
@@ -741,8 +741,8 @@ public abstract class AbstractRegion implements InternalRegion, AttributesMutato
// This is for all regions except pdx Region
if (!isPdxTypesRegion) {
// Make sure we are distributing to only those senders whose id
- // is available on this region
- if (allGatewaySenderIds.contains(sender.getId())) {
+ // is available on this region and whose state is running
+ if (hasRunningGatewaySender(allGatewaySenders, sender)) {
allRemoteDSIds.add(sender.getRemoteDSId());
}
} else { // this else is for PDX region
@@ -1897,6 +1897,10 @@ public abstract class AbstractRegion implements InternalRegion, AttributesMutato
PoolImpl find(String poolName);
}
+ static boolean hasRunningGatewaySender(Set<GatewaySender> senders, GatewaySender sender) {
+ return senders.contains(sender) && sender.isRunning();
+ }
+
@VisibleForTesting
ConcurrentHashMap<RegionEntry, EntryExpiryTask> getEntryExpiryTasks() {
return entryExpiryTasks;
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionTest.java
index 42b5b0b..48c5037 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/AbstractRegionTest.java
@@ -18,8 +18,12 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.Collections;
+import java.util.Set;
+
import org.junit.Test;
+import org.apache.geode.cache.wan.GatewaySender;
public class AbstractRegionTest {
@@ -34,4 +38,39 @@ public class AbstractRegionTest {
assertThat(mockAbstractRegion.isAllEvents()).isTrue();
assertThat(mockAbstractRegion.cacheTimeMillis()).isEqualTo(millis);
}
+
+ @Test
+ public void hasRunningGatewaySender_returnsFalse_ifSendersIsEmpty() {
+ GatewaySender sender = mock(GatewaySender.class);
+
+ boolean value = AbstractRegion.hasRunningGatewaySender(Collections.emptySet(), sender);
+
+ assertThat(value).isFalse();
+ }
+
+ @Test
+ public void hasRunningGatewaySender_returnsFalse_ifSenderIsStopped() {
+ GatewaySender mockSender = mock(GatewaySender.class);
+ Set<GatewaySender> senders = (Set<GatewaySender>) mock(Set.class);
+
+ when(senders.contains(mockSender)).thenReturn(true);
+ when(mockSender.isRunning()).thenReturn(false);
+
+ boolean value = AbstractRegion.hasRunningGatewaySender(senders, mockSender);
+
+ assertThat(value).isFalse();
+ }
+
+ @Test
+ public void hasRunningGatewaySender_returnsTrue_ifSenderIsRunning() {
+ GatewaySender mockSender = mock(GatewaySender.class);
+ Set<GatewaySender> senders = (Set<GatewaySender>) mock(Set.class);
+
+ when(senders.contains(mockSender)).thenReturn(true);
+ when(mockSender.isRunning()).thenReturn(true);
+
+ boolean value = AbstractRegion.hasRunningGatewaySender(senders, mockSender);
+
+ assertThat(value).isTrue();
+ }
}
diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index 2842e84..3a8a6a1 100644
--- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -800,6 +800,79 @@ public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
}
@Test
+ public void testMultiSiteReplication() {
+ String site1to2SenderId = "site1to2-sender";
+ String site1to3SenderId = "site1to3-sender";
+ String site2to3SenderId = "site2to3-sender";
+ String regionName = testName.getMethodName();
+ int[] ports = getRandomAvailableTCPPorts(3);
+ int site1Port = ports[0];
+ int site2Port = ports[1];
+ int site3Port = ports[2];
+ Set<String> site1RemoteLocators =
+ Stream.of("localhost[" + site2Port + "]", "localhost[" + site3Port + "]")
+ .collect(Collectors.toSet());
+ Set<String> site2RemoteLocators =
+ Stream.of("localhost[" + site1Port + "]", "localhost[" + site3Port + "]")
+ .collect(Collectors.toSet());
+ Set<String> site3RemoteLocators =
+ Stream.of("localhost[" + site1Port + "]", "localhost[" + site2Port + "]")
+ .collect(Collectors.toSet());
+
+ // Start 3 sites.
+ vm0.invoke(() -> createLocator(1, site1Port,
+ Collections.singleton("localhost[" + site1Port + "]"), site1RemoteLocators));
+ vm1.invoke(() -> createLocator(2, site2Port,
+ Collections.singleton("localhost[" + site2Port + "]"), site2RemoteLocators));
+ vm2.invoke(() -> createLocator(3, site3Port,
+ Collections.singleton("localhost[" + site3Port + "]"), site3RemoteLocators));
+
+ // Create the cache on the 3 sites.
+ createCacheInVMs(site1Port, vm3);
+ createCacheInVMs(site2Port, vm4);
+ createCacheInVMs(site3Port, vm5);
+
+ // Create senders and partitioned region on site 1.
+ vm3.invoke(() -> {
+ createSender(site1to2SenderId, 2, true, 100, 20, false, false, null, false);
+ createSender(site1to3SenderId, 3, true, 100, 20, false, false, null, false);
+ waitForSenderRunningState(site1to2SenderId);
+ waitForSenderRunningState(site1to3SenderId);
+
+ createPartitionedRegion(regionName, String.join(",", site1to2SenderId, site1to3SenderId), 1,
+ 113,
+ isOffHeap());
+ });
+
+ // Create receiver, sender and partitioned region on site 2.
+ vm4.invoke(() -> {
+ createReceiver();
+ createSender(site2to3SenderId, 3, true, 100, 20, false, false, null, false);
+ waitForSenderRunningState(site2to3SenderId);
+
+ createPartitionedRegion(regionName, String.join(",", site2to3SenderId), 1, 113,
+ isOffHeap());
+ });
+
+ // Create receiver and partitioned region on site 3.
+ vm5.invoke(() -> {
+ createReceiver();
+ createPartitionedRegion(regionName, null, 1, 113, isOffHeap());
+ });
+
+ // Do puts
+ vm3.invoke(() -> doPuts(regionName, 200));
+ validateRegionSizes(regionName, 200, vm3, vm4, vm5);
+
+ // Stop sender from site 1 to site 3.
+ vm3.invoke(() -> stopSender(site1to3SenderId));
+
+ // Do puts again
+ vm3.invoke(() -> doPuts(regionName, 1000));
+ validateRegionSizes(regionName, 1000, vm3, vm4, vm5);
+ }
+
+ @Test
public void testParallelGatewaySenderConcurrentPutClearNoOffheapOrphans()
throws Exception {
MemberVM locator = clusterStartupRule.startLocatorVM(1, new Properties());