You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2019/05/31 00:33:24 UTC

[geode] branch feature/GEODE-6821 created (now 3ecdc1f)

This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a change to branch feature/GEODE-6821
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at 3ecdc1f  GEODE-6821: Shared P2P reader no longer processes messages on regions with a serial sender inline

This branch includes the following new commits:

     new 3ecdc1f  GEODE-6821: Shared P2P reader no longer processes messages on regions with a serial sender inline

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[geode] 01/01: GEODE-6821: Shared P2P reader no longer processes messages on regions with a serial sender inline

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch feature/GEODE-6821
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 3ecdc1f0a10acc7fa45038fb3bcd321b3a712a5b
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Thu May 30 17:30:48 2019 -0700

    GEODE-6821: Shared P2P reader no longer processes messages on regions with a serial sender inline
---
 ...ListenersDifferentPrimariesDistributedTest.java | 317 +++++++++++++++++++++
 .../distributed/internal/DistributionMessage.java  |   4 +-
 .../internal/cache/AbstractUpdateOperation.java    |   4 +-
 .../apache/geode/internal/cache/BucketRegion.java  |   4 +-
 .../geode/internal/cache/DestroyOperation.java     |   4 +-
 .../internal/cache/DistributedCacheOperation.java  |   4 +-
 .../apache/geode/internal/cache/LocalRegion.java   |  10 +-
 .../internal/cache/partitioned/DestroyMessage.java |   2 +-
 .../cache/partitioned/PartitionMessage.java        |   2 +-
 .../cache/partitioned/PutAllPRMessage.java         |   2 +-
 .../internal/cache/partitioned/PutMessage.java     |   2 +-
 .../cache/partitioned/RemoveAllPRMessage.java      |   2 +-
 .../internal/cache/CacheOperationMessageTest.java  |   4 +-
 13 files changed, 337 insertions(+), 24 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/SerialAsyncEventListenersDifferentPrimariesDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/SerialAsyncEventListenersDifferentPrimariesDistributedTest.java
new file mode 100644
index 0000000..fecf7ce
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/SerialAsyncEventListenersDifferentPrimariesDistributedTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.asyncqueue;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.ForcedDisconnectException;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.internal.OSProcess;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.AEQTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(AEQTest.class)
+@SuppressWarnings("serial")
+@RunWith(JUnitParamsRunner.class)
+public class SerialAsyncEventListenersDifferentPrimariesDistributedTest implements Serializable {
+
+  private MemberVM locator;
+
+  private MemberVM server1;
+
+  private MemberVM server2;
+
+  private MemberVM server3;
+
+  @Rule
+  public ClusterStartupRule clusterRule = new ClusterStartupRule();
+
+  @Rule
+  public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Rule
+  public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+  private static final Random RANDOM = new Random();
+
+  private static final int NUM_THREADS = 20;
+
+  private static final long TIME_TO_RUN = 30000; // milliseconds
+
+  private static final long TIME_TO_WAIT = 1; // minutes
+
+  @Test
+  @Parameters({"REPLICATE", "PARTITION", "PARTITION_REDUNDANT"})
+  public void testMultithreadedFunctionExecutionsDoingRegionOperations(RegionShortcut shortcut) {
+    // This is a test for GEODE-6821.
+    //
+    // 3 servers each with:
+    // - a function that performs a random region operation on the input region
+    // - a replicated region on which the function is executed
+    // - two regions each with a serial AEQ (the type of region varies between replicate, partition,
+    // partition_redundant)
+    //
+    // 1 multi-threaded client that repeatedly executes the function for a specified length of time
+    // with random region names and operations.
+    //
+    // This test will deadlock pretty much immediately if the shared P2P reader thread processes any
+    // replication.
+
+    // Start Locator
+    locator = clusterRule.startLocatorVM(0);
+
+    // Start servers
+    int locatorPort = locator.getPort();
+    server1 = clusterRule.startServerVM(1, s -> s.withConnectionToLocator(locatorPort));
+    server2 = clusterRule.startServerVM(2, s -> s.withConnectionToLocator(locatorPort));
+    server3 = clusterRule.startServerVM(3, s -> s.withConnectionToLocator(locatorPort));
+
+    // Register Function in all servers
+    Stream.of(server1, server2, server3).forEach(server -> server
+        .invoke(() -> FunctionService.registerFunction(new RegionOperationsFunction())));
+
+    // Create AEQ1 in all servers with server1 as primary
+    String aeq1Id = testName.getMethodName() + "_1";
+    Stream.of(server1, server2, server3).forEach(
+        server -> server.invoke(() -> createAsyncEventQueue(aeq1Id, new TestAsyncEventListener())));
+
+    // Create AEQ2 in all servers with server2 as primary
+    String aeq2Id = testName.getMethodName() + "_2";
+    Stream.of(server2, server1, server3).forEach(
+        server -> server.invoke(() -> createAsyncEventQueue(aeq2Id, new TestAsyncEventListener())));
+
+    // Create region the function is executed on in all servers
+    String functionExecutionRegionName = testName.getMethodName() + "_functionExecutor";
+    Stream.of(server1, server2, server3).forEach(
+        server -> server.invoke(() -> createRegion(functionExecutionRegionName, REPLICATE)));
+
+    // Create region attached to AEQ1 in all servers
+    String aeg1RegionName = testName.getMethodName() + "_1";
+    Stream.of(server1, server2, server3)
+        .forEach(server -> server.invoke(() -> createRegion(aeg1RegionName, REPLICATE, aeq1Id)));
+
+    // Create region attached to AEQ2 in all servers
+    String aeg2RegionName = testName.getMethodName() + "_2";
+    Stream.of(server1, server2, server3)
+        .forEach(server -> server.invoke(() -> createRegion(aeg2RegionName, REPLICATE, aeq2Id)));
+
+    // Create Client cache and proxy function region
+    createClientCacheAndRegion(locatorPort, functionExecutionRegionName);
+
+    // Launch threads to execute the Function from multiple threads in multiple members doing
+    // multiple operations against multiple regions
+    List<CompletableFuture<Void>> futures = launchTimedFunctionExecutionThreads(
+        functionExecutionRegionName, aeg1RegionName, aeg2RegionName, NUM_THREADS, TIME_TO_RUN);
+
+    // Wait for futures to complete. If they complete, the test is successful. If they timeout, the
+    // test is unsuccessful.
+    waitForFuturesToComplete(futures);
+  }
+
+  private void createAsyncEventQueue(String asyncEventQueueId,
+      AsyncEventListener asyncEventListener) {
+    ClusterStartupRule.getCache().createAsyncEventQueueFactory().setDispatcherThreads(1)
+        .setParallel(false).create(asyncEventQueueId, asyncEventListener);
+  }
+
+  private void createRegion(String regionName, RegionShortcut shortcut) {
+    createRegion(regionName, shortcut, null);
+  }
+
+  private void createRegion(String regionName, RegionShortcut shortcut, String asyncEventQueueId) {
+    RegionFactory factory = ClusterStartupRule.getCache().createRegionFactory(shortcut);
+    if (asyncEventQueueId != null) {
+      factory.addAsyncEventQueueId(asyncEventQueueId);
+    }
+    factory.create(regionName);
+  }
+
+  private void createClientCacheAndRegion(int port, String regionName) {
+    ClientCacheFactory clientCacheFactory =
+        new ClientCacheFactory().addPoolLocator(getHostName(), port).setPoolRetryAttempts(0);
+    clientCacheRule.createClientCache(clientCacheFactory);
+    clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
+        .create(regionName);
+  }
+
+  private List<CompletableFuture<Void>> launchTimedFunctionExecutionThreads(
+      String functionExecutionRegionName, String aeg1RegionName, String aeg2RegionName,
+      int numThreads, long timeToRun) {
+    String[] possibleOperationRegionNames = new String[] {aeg1RegionName, aeg2RegionName};
+    String[] possibleOperations = new String[] {"put", "putAll", "destroy", "removeAll"};
+    List<CompletableFuture<Void>> futures = new ArrayList<>();
+    for (int i = 0; i < numThreads; i++) {
+      futures.add(executorServiceRule.runAsync(
+          () -> executeFunctionTimed(RegionOperationsFunction.class.getSimpleName(),
+              functionExecutionRegionName, possibleOperationRegionNames, possibleOperations,
+              timeToRun)));
+    }
+    return futures;
+  }
+
+  private void executeFunctionTimed(String functionId, String regionName,
+      String[] possibleOperationRegionNames, String[] possibleOperations, long timeToRun) {
+    long endRun = System.currentTimeMillis() + timeToRun;
+    while (System.currentTimeMillis() < endRun) {
+      String operationRegionName =
+          possibleOperationRegionNames[RANDOM.nextInt(possibleOperationRegionNames.length)];
+      String operation = possibleOperations[RANDOM.nextInt(possibleOperations.length)];
+      FunctionService.onRegion(clientCacheRule.getClientCache().getRegion(regionName))
+          .setArguments(new String[] {operationRegionName, operation}).execute(functionId)
+          .getResult();
+    }
+  }
+
+  private void waitForFuturesToComplete(List<CompletableFuture<Void>> futures) {
+    try {
+      CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])).get(
+          TIME_TO_WAIT,
+          MINUTES);
+    } catch (Exception e) {
+      // An exception occurred. Since this could be a deadlock, dump the stacks and kill the
+      // servers.
+      addIgnoredException("Possible loss of quorum");
+      addIgnoredException(ForcedDisconnectException.class);
+      server1.invoke(() -> dumpStacks());
+      server2.invoke(() -> dumpStacks());
+      server3.invoke(() -> dumpStacks());
+      clusterRule.crashVM(1);
+      clusterRule.crashVM(2);
+      clusterRule.crashVM(3);
+      fail(e.toString());
+    }
+  }
+
+  private void dumpStacks() {
+    OSProcess.printStacks(0);
+  }
+
+  public static class RegionOperationsFunction implements Function {
+
+    private final Cache cache;
+
+    private static final int NUM_ENTRIES = 10000;
+
+    public RegionOperationsFunction() {
+      this.cache = CacheFactory.getAnyInstance();
+    }
+
+    @Override
+    public void execute(FunctionContext context) {
+      String[] args = (String[]) context.getArguments();
+      String regionName = args[0];
+      String operation = args[1];
+      Region region = this.cache.getRegion(regionName);
+      switch (operation) {
+        case "put":
+          doPut(region);
+          break;
+        case "putAll":
+          doPutAll(region);
+          break;
+        case "destroy":
+          doDestroy(region);
+          break;
+        case "removeAll":
+          doRemoveAll(region);
+          break;
+      }
+      context.getResultSender().lastResult(true);
+    }
+
+    private void doPut(Region region) {
+      region.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+    }
+
+    private void doPutAll(Region region) {
+      Map events = new HashMap();
+      events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+      events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+      events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+      events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+      events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+      region.putAll(events);
+    }
+
+    private void doDestroy(Region region) {
+      try {
+        region.destroy(RANDOM.nextInt(NUM_ENTRIES));
+      } catch (EntryNotFoundException e) {
+      }
+    }
+
+    private void doRemoveAll(Region region) {
+      List keys = new ArrayList();
+      keys.add(RANDOM.nextInt(NUM_ENTRIES));
+      keys.add(RANDOM.nextInt(NUM_ENTRIES));
+      keys.add(RANDOM.nextInt(NUM_ENTRIES));
+      keys.add(RANDOM.nextInt(NUM_ENTRIES));
+      keys.add(RANDOM.nextInt(NUM_ENTRIES));
+      region.removeAll(keys);
+    }
+
+    @Override
+    public String getId() {
+      return getClass().getSimpleName();
+    }
+  }
+
+  private static class TestAsyncEventListener implements AsyncEventListener {
+
+    @Override
+    public boolean processEvents(List<AsyncEvent> events) {
+      return true;
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
index 2485bd0..ef897e8 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
@@ -413,7 +413,7 @@ public abstract class DistributionMessage implements DataSerializableFixedID, Cl
       // If processing this message may need to add
       // to more than one serial gateway then don't
       // do it inline.
-      if (mayAddToMultipleSerialGateways(dm)) {
+      if (mayAddToSerialGateway(dm)) {
         inlineProcess = false;
       }
     }
@@ -465,7 +465,7 @@ public abstract class DistributionMessage implements DataSerializableFixedID, Cl
     } // not inline
   }
 
-  protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+  protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
     // subclasses should override this method if processing
     // them may add to multiple serial gateways.
     return false;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
index 639229a..fcf4ab0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
@@ -327,8 +327,8 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
     }
 
     @Override
-    protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
-      return _mayAddToMultipleSerialGateways(dm);
+    protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
+      return _mayAddToSerialGateway(dm);
     }
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 4853090..cce7f8d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -2367,8 +2367,8 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   @Override
-  public boolean notifiesMultipleSerialGateways() {
-    return getPartitionedRegion().notifiesMultipleSerialGateways();
+  public boolean notifiesSerialGateway() {
+    return getPartitionedRegion().notifiesSerialGateway();
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
index 8b3649d..4741161 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
@@ -206,8 +206,8 @@ public class DestroyOperation extends DistributedCacheOperation {
     }
 
     @Override
-    protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
-      return _mayAddToMultipleSerialGateways(dm);
+    protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
+      return _mayAddToSerialGateway(dm);
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 1699a7b..93f6c81 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -1529,14 +1529,14 @@ public abstract class DistributedCacheOperation {
       this.hasOldValue = true;
     }
 
-    protected boolean _mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+    protected boolean _mayAddToSerialGateway(ClusterDistributionManager dm) {
       int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT);
       try {
         LocalRegion lr = getLocalRegionForProcessing(dm);
         if (lr == null) {
           return false;
         }
-        return lr.notifiesMultipleSerialGateways();
+        return lr.notifiesSerialGateway();
       } catch (RuntimeException ignore) {
         return false;
       } finally {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 4411557..2239f62 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -6077,9 +6077,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   }
 
   /**
-   * Returns true if this region notifies multiple serial gateways.
+   * Returns true if this region notifies any serial gateways.
    */
-  public boolean notifiesMultipleSerialGateways() {
+  public boolean notifiesSerialGateway() {
     if (isPdxTypesRegion()) {
       return false;
     }
@@ -6087,14 +6087,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     if (!allGatewaySenderIds.isEmpty()) {
       List<Integer> allRemoteDSIds = getRemoteDsIds(allGatewaySenderIds);
       if (allRemoteDSIds != null) {
-        int serialGatewayCount = 0;
         for (GatewaySender sender : getCache().getAllGatewaySenders()) {
           if (allGatewaySenderIds.contains(sender.getId())) {
             if (!sender.isParallel()) {
-              serialGatewayCount++;
-              if (serialGatewayCount > 1) {
-                return true;
-              }
+              return true;
             }
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
index 79ba365..2cfc877 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
@@ -457,7 +457,7 @@ public class DestroyMessage extends PartitionMessageWithDirectReply {
   }
 
   @Override
-  protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+  protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
     return _mayAddToMultipleSerialGateways(dm);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
index 3ef1b09..28bf024 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
@@ -710,7 +710,7 @@ public abstract class PartitionMessage extends DistributionMessage
       if (pr == null) {
         return false;
       }
-      return pr.notifiesMultipleSerialGateways();
+      return pr.notifiesSerialGateway();
     } catch (PRLocallyDestroyedException ignore) {
       return false;
     } catch (RuntimeException ignore) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
index a0d982c..3a58e41 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
@@ -687,7 +687,7 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
   }
 
   @Override
-  protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+  protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
     return _mayAddToMultipleSerialGateways(dm);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java
index 5c64725..20c29da 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java
@@ -823,7 +823,7 @@ public class PutMessage extends PartitionMessageWithDirectReply implements NewVa
   }
 
   @Override
-  protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+  protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
     return _mayAddToMultipleSerialGateways(dm);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index 776eb7d..bb0ddeb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -680,7 +680,7 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
   }
 
   @Override
-  protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+  protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
     return _mayAddToMultipleSerialGateways(dm);
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
index 7beced8..a70fa8c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
@@ -34,7 +34,7 @@ public class CacheOperationMessageTest {
     ClusterDistributionManager mockDistributionManager = mock(ClusterDistributionManager.class);
 
     when(mockCacheOperationMessage.supportsDirectAck()).thenReturn(true);
-    when(mockCacheOperationMessage._mayAddToMultipleSerialGateways(eq(mockDistributionManager)))
+    when(mockCacheOperationMessage._mayAddToSerialGateway(eq(mockDistributionManager)))
         .thenReturn(true);
 
     mockCacheOperationMessage.process(mockDistributionManager);
@@ -42,7 +42,7 @@ public class CacheOperationMessageTest {
     verify(mockCacheOperationMessage, times(1)).process(mockDistributionManager);
 
     assertThat(mockCacheOperationMessage.supportsDirectAck()).isTrue();
-    assertThat(mockCacheOperationMessage._mayAddToMultipleSerialGateways(mockDistributionManager))
+    assertThat(mockCacheOperationMessage._mayAddToSerialGateway(mockDistributionManager))
         .isTrue();
   }
 }