You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2019/05/31 00:33:25 UTC
[geode] 01/01: GEODE-6821: Shared P2P reader no longer processes
messages on regions with a serial sender inline
This is an automated email from the ASF dual-hosted git repository.
boglesby pushed a commit to branch feature/GEODE-6821
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 3ecdc1f0a10acc7fa45038fb3bcd321b3a712a5b
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Thu May 30 17:30:48 2019 -0700
GEODE-6821: Shared P2P reader no longer processes messages on regions with a serial sender inline
---
...ListenersDifferentPrimariesDistributedTest.java | 317 +++++++++++++++++++++
.../distributed/internal/DistributionMessage.java | 4 +-
.../internal/cache/AbstractUpdateOperation.java | 4 +-
.../apache/geode/internal/cache/BucketRegion.java | 4 +-
.../geode/internal/cache/DestroyOperation.java | 4 +-
.../internal/cache/DistributedCacheOperation.java | 4 +-
.../apache/geode/internal/cache/LocalRegion.java | 10 +-
.../internal/cache/partitioned/DestroyMessage.java | 2 +-
.../cache/partitioned/PartitionMessage.java | 2 +-
.../cache/partitioned/PutAllPRMessage.java | 2 +-
.../internal/cache/partitioned/PutMessage.java | 2 +-
.../cache/partitioned/RemoveAllPRMessage.java | 2 +-
.../internal/cache/CacheOperationMessageTest.java | 4 +-
13 files changed, 337 insertions(+), 24 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/SerialAsyncEventListenersDifferentPrimariesDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/SerialAsyncEventListenersDifferentPrimariesDistributedTest.java
new file mode 100644
index 0000000..fecf7ce
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/SerialAsyncEventListenersDifferentPrimariesDistributedTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.asyncqueue;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.ForcedDisconnectException;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.internal.OSProcess;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.AEQTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(AEQTest.class)
+@SuppressWarnings("serial")
+@RunWith(JUnitParamsRunner.class)
+public class SerialAsyncEventListenersDifferentPrimariesDistributedTest implements Serializable {
+
+ private MemberVM locator;
+
+ private MemberVM server1;
+
+ private MemberVM server2;
+
+ private MemberVM server3;
+
+ @Rule
+ public ClusterStartupRule clusterRule = new ClusterStartupRule();
+
+ @Rule
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Rule
+ public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+ private static final Random RANDOM = new Random();
+
+ private static final int NUM_THREADS = 20;
+
+ private static final long TIME_TO_RUN = 30000; // milliseconds
+
+ private static final long TIME_TO_WAIT = 1; // minutes
+
+ @Test
+ @Parameters({"REPLICATE", "PARTITION", "PARTITION_REDUNDANT"})
+ public void testMultithreadedFunctionExecutionsDoingRegionOperations(RegionShortcut shortcut) {
+ // This is a test for GEODE-6821.
+ //
+ // 3 servers each with:
+ // - a function that performs a random region operation on the input region
+ // - a replicated region on which the function is executed
+ // - two regions each with a serial AEQ (the type of region varies between replicate, partition,
+ // partition_redundant)
+ //
+ // 1 multi-threaded client that repeatedly executes the function for a specified length of time
+ // with random region names and operations.
+ //
+ // This test will deadlock pretty much immediately if the shared P2P reader thread processes any
+ // replication.
+
+ // Start Locator
+ locator = clusterRule.startLocatorVM(0);
+
+ // Start servers
+ int locatorPort = locator.getPort();
+ server1 = clusterRule.startServerVM(1, s -> s.withConnectionToLocator(locatorPort));
+ server2 = clusterRule.startServerVM(2, s -> s.withConnectionToLocator(locatorPort));
+ server3 = clusterRule.startServerVM(3, s -> s.withConnectionToLocator(locatorPort));
+
+ // Register Function in all servers
+ Stream.of(server1, server2, server3).forEach(server -> server
+ .invoke(() -> FunctionService.registerFunction(new RegionOperationsFunction())));
+
+ // Create AEQ1 in all servers with server1 as primary
+ String aeq1Id = testName.getMethodName() + "_1";
+ Stream.of(server1, server2, server3).forEach(
+ server -> server.invoke(() -> createAsyncEventQueue(aeq1Id, new TestAsyncEventListener())));
+
+ // Create AEQ2 in all servers with server2 as primary
+ String aeq2Id = testName.getMethodName() + "_2";
+ Stream.of(server2, server1, server3).forEach(
+ server -> server.invoke(() -> createAsyncEventQueue(aeq2Id, new TestAsyncEventListener())));
+
+ // Create region the function is executed on in all servers
+ String functionExecutionRegionName = testName.getMethodName() + "_functionExecutor";
+ Stream.of(server1, server2, server3).forEach(
+ server -> server.invoke(() -> createRegion(functionExecutionRegionName, REPLICATE)));
+
+ // Create region attached to AEQ1 in all servers
+ String aeg1RegionName = testName.getMethodName() + "_1";
+ Stream.of(server1, server2, server3)
+ .forEach(server -> server.invoke(() -> createRegion(aeg1RegionName, REPLICATE, aeq1Id)));
+
+ // Create region attached to AEQ2 in all servers
+ String aeg2RegionName = testName.getMethodName() + "_2";
+ Stream.of(server1, server2, server3)
+ .forEach(server -> server.invoke(() -> createRegion(aeg2RegionName, REPLICATE, aeq2Id)));
+
+ // Create Client cache and proxy function region
+ createClientCacheAndRegion(locatorPort, functionExecutionRegionName);
+
+ // Launch threads to execute the Function from multiple threads in multiple members doing
+ // multiple operations against multiple regions
+ List<CompletableFuture<Void>> futures = launchTimedFunctionExecutionThreads(
+ functionExecutionRegionName, aeg1RegionName, aeg2RegionName, NUM_THREADS, TIME_TO_RUN);
+
+ // Wait for futures to complete. If they complete, the test is successful. If they timeout, the
+ // test is unsuccessful.
+ waitForFuturesToComplete(futures);
+ }
+
+ private void createAsyncEventQueue(String asyncEventQueueId,
+ AsyncEventListener asyncEventListener) {
+ ClusterStartupRule.getCache().createAsyncEventQueueFactory().setDispatcherThreads(1)
+ .setParallel(false).create(asyncEventQueueId, asyncEventListener);
+ }
+
+ private void createRegion(String regionName, RegionShortcut shortcut) {
+ createRegion(regionName, shortcut, null);
+ }
+
+ private void createRegion(String regionName, RegionShortcut shortcut, String asyncEventQueueId) {
+ RegionFactory factory = ClusterStartupRule.getCache().createRegionFactory(shortcut);
+ if (asyncEventQueueId != null) {
+ factory.addAsyncEventQueueId(asyncEventQueueId);
+ }
+ factory.create(regionName);
+ }
+
+ private void createClientCacheAndRegion(int port, String regionName) {
+ ClientCacheFactory clientCacheFactory =
+ new ClientCacheFactory().addPoolLocator(getHostName(), port).setPoolRetryAttempts(0);
+ clientCacheRule.createClientCache(clientCacheFactory);
+ clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
+ .create(regionName);
+ }
+
+ private List<CompletableFuture<Void>> launchTimedFunctionExecutionThreads(
+ String functionExecutionRegionName, String aeg1RegionName, String aeg2RegionName,
+ int numThreads, long timeToRun) {
+ String[] possibleOperationRegionNames = new String[] {aeg1RegionName, aeg2RegionName};
+ String[] possibleOperations = new String[] {"put", "putAll", "destroy", "removeAll"};
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ futures.add(executorServiceRule.runAsync(
+ () -> executeFunctionTimed(RegionOperationsFunction.class.getSimpleName(),
+ functionExecutionRegionName, possibleOperationRegionNames, possibleOperations,
+ timeToRun)));
+ }
+ return futures;
+ }
+
+ private void executeFunctionTimed(String functionId, String regionName,
+ String[] possibleOperationRegionNames, String[] possibleOperations, long timeToRun) {
+ long endRun = System.currentTimeMillis() + timeToRun;
+ while (System.currentTimeMillis() < endRun) {
+ String operationRegionName =
+ possibleOperationRegionNames[RANDOM.nextInt(possibleOperationRegionNames.length)];
+ String operation = possibleOperations[RANDOM.nextInt(possibleOperations.length)];
+ FunctionService.onRegion(clientCacheRule.getClientCache().getRegion(regionName))
+ .setArguments(new String[] {operationRegionName, operation}).execute(functionId)
+ .getResult();
+ }
+ }
+
+ private void waitForFuturesToComplete(List<CompletableFuture<Void>> futures) {
+ try {
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])).get(
+ TIME_TO_WAIT,
+ MINUTES);
+ } catch (Exception e) {
+ // An exception occurred. Since this could be a deadlock, dump the stacks and kill the
+ // servers.
+ addIgnoredException("Possible loss of quorum");
+ addIgnoredException(ForcedDisconnectException.class);
+ server1.invoke(() -> dumpStacks());
+ server2.invoke(() -> dumpStacks());
+ server3.invoke(() -> dumpStacks());
+ clusterRule.crashVM(1);
+ clusterRule.crashVM(2);
+ clusterRule.crashVM(3);
+ fail(e.toString());
+ }
+ }
+
+ private void dumpStacks() {
+ OSProcess.printStacks(0);
+ }
+
+ public static class RegionOperationsFunction implements Function {
+
+ private final Cache cache;
+
+ private static final int NUM_ENTRIES = 10000;
+
+ public RegionOperationsFunction() {
+ this.cache = CacheFactory.getAnyInstance();
+ }
+
+ @Override
+ public void execute(FunctionContext context) {
+ String[] args = (String[]) context.getArguments();
+ String regionName = args[0];
+ String operation = args[1];
+ Region region = this.cache.getRegion(regionName);
+ switch (operation) {
+ case "put":
+ doPut(region);
+ break;
+ case "putAll":
+ doPutAll(region);
+ break;
+ case "destroy":
+ doDestroy(region);
+ break;
+ case "removeAll":
+ doRemoveAll(region);
+ break;
+ }
+ context.getResultSender().lastResult(true);
+ }
+
+ private void doPut(Region region) {
+ region.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+ }
+
+ private void doPutAll(Region region) {
+ Map events = new HashMap();
+ events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+ events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+ events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+ events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+ events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+ region.putAll(events);
+ }
+
+ private void doDestroy(Region region) {
+ try {
+ region.destroy(RANDOM.nextInt(NUM_ENTRIES));
+ } catch (EntryNotFoundException e) {
+ }
+ }
+
+ private void doRemoveAll(Region region) {
+ List keys = new ArrayList();
+ keys.add(RANDOM.nextInt(NUM_ENTRIES));
+ keys.add(RANDOM.nextInt(NUM_ENTRIES));
+ keys.add(RANDOM.nextInt(NUM_ENTRIES));
+ keys.add(RANDOM.nextInt(NUM_ENTRIES));
+ keys.add(RANDOM.nextInt(NUM_ENTRIES));
+ region.removeAll(keys);
+ }
+
+ @Override
+ public String getId() {
+ return getClass().getSimpleName();
+ }
+ }
+
+ private static class TestAsyncEventListener implements AsyncEventListener {
+
+ @Override
+ public boolean processEvents(List<AsyncEvent> events) {
+ return true;
+ }
+ }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
index 2485bd0..ef897e8 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
@@ -413,7 +413,7 @@ public abstract class DistributionMessage implements DataSerializableFixedID, Cl
// If processing this message may need to add
// to more than one serial gateway then don't
// do it inline.
- if (mayAddToMultipleSerialGateways(dm)) {
+ if (mayAddToSerialGateway(dm)) {
inlineProcess = false;
}
}
@@ -465,7 +465,7 @@ public abstract class DistributionMessage implements DataSerializableFixedID, Cl
} // not inline
}
- protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+ protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
// subclasses should override this method if processing
// them may add to multiple serial gateways.
return false;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
index 639229a..fcf4ab0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
@@ -327,8 +327,8 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
}
@Override
- protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
- return _mayAddToMultipleSerialGateways(dm);
+ protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
+ return _mayAddToSerialGateway(dm);
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 4853090..cce7f8d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -2367,8 +2367,8 @@ public class BucketRegion extends DistributedRegion implements Bucket {
}
@Override
- public boolean notifiesMultipleSerialGateways() {
- return getPartitionedRegion().notifiesMultipleSerialGateways();
+ public boolean notifiesSerialGateway() {
+ return getPartitionedRegion().notifiesSerialGateway();
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
index 8b3649d..4741161 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
@@ -206,8 +206,8 @@ public class DestroyOperation extends DistributedCacheOperation {
}
@Override
- protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
- return _mayAddToMultipleSerialGateways(dm);
+ protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
+ return _mayAddToSerialGateway(dm);
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 1699a7b..93f6c81 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -1529,14 +1529,14 @@ public abstract class DistributedCacheOperation {
this.hasOldValue = true;
}
- protected boolean _mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+ protected boolean _mayAddToSerialGateway(ClusterDistributionManager dm) {
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT);
try {
LocalRegion lr = getLocalRegionForProcessing(dm);
if (lr == null) {
return false;
}
- return lr.notifiesMultipleSerialGateways();
+ return lr.notifiesSerialGateway();
} catch (RuntimeException ignore) {
return false;
} finally {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 4411557..2239f62 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -6077,9 +6077,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
/**
- * Returns true if this region notifies multiple serial gateways.
+ * Returns true if this region notifies any serial gateways.
*/
- public boolean notifiesMultipleSerialGateways() {
+ public boolean notifiesSerialGateway() {
if (isPdxTypesRegion()) {
return false;
}
@@ -6087,14 +6087,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
if (!allGatewaySenderIds.isEmpty()) {
List<Integer> allRemoteDSIds = getRemoteDsIds(allGatewaySenderIds);
if (allRemoteDSIds != null) {
- int serialGatewayCount = 0;
for (GatewaySender sender : getCache().getAllGatewaySenders()) {
if (allGatewaySenderIds.contains(sender.getId())) {
if (!sender.isParallel()) {
- serialGatewayCount++;
- if (serialGatewayCount > 1) {
- return true;
- }
+ return true;
}
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
index 79ba365..2cfc877 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
@@ -457,7 +457,7 @@ public class DestroyMessage extends PartitionMessageWithDirectReply {
}
@Override
- protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+ protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
return _mayAddToMultipleSerialGateways(dm);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
index 3ef1b09..28bf024 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
@@ -710,7 +710,7 @@ public abstract class PartitionMessage extends DistributionMessage
if (pr == null) {
return false;
}
- return pr.notifiesMultipleSerialGateways();
+ return pr.notifiesSerialGateway();
} catch (PRLocallyDestroyedException ignore) {
return false;
} catch (RuntimeException ignore) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
index a0d982c..3a58e41 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
@@ -687,7 +687,7 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
}
@Override
- protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+ protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
return _mayAddToMultipleSerialGateways(dm);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java
index 5c64725..20c29da 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java
@@ -823,7 +823,7 @@ public class PutMessage extends PartitionMessageWithDirectReply implements NewVa
}
@Override
- protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+ protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
return _mayAddToMultipleSerialGateways(dm);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index 776eb7d..bb0ddeb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -680,7 +680,7 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
}
@Override
- protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+ protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
return _mayAddToMultipleSerialGateways(dm);
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
index 7beced8..a70fa8c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
@@ -34,7 +34,7 @@ public class CacheOperationMessageTest {
ClusterDistributionManager mockDistributionManager = mock(ClusterDistributionManager.class);
when(mockCacheOperationMessage.supportsDirectAck()).thenReturn(true);
- when(mockCacheOperationMessage._mayAddToMultipleSerialGateways(eq(mockDistributionManager)))
+ when(mockCacheOperationMessage._mayAddToSerialGateway(eq(mockDistributionManager)))
.thenReturn(true);
mockCacheOperationMessage.process(mockDistributionManager);
@@ -42,7 +42,7 @@ public class CacheOperationMessageTest {
verify(mockCacheOperationMessage, times(1)).process(mockDistributionManager);
assertThat(mockCacheOperationMessage.supportsDirectAck()).isTrue();
- assertThat(mockCacheOperationMessage._mayAddToMultipleSerialGateways(mockDistributionManager))
+ assertThat(mockCacheOperationMessage._mayAddToSerialGateway(mockDistributionManager))
.isTrue();
}
}