You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bo...@apache.org on 2019/05/31 00:33:25 UTC

[geode] 01/01: GEODE-6821: Shared P2P reader no longer processes messages on regions with a serial sender inline

This is an automated email from the ASF dual-hosted git repository.

boglesby pushed a commit to branch feature/GEODE-6821
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 3ecdc1f0a10acc7fa45038fb3bcd321b3a712a5b
Author: Barry Oglesby <bo...@pivotal.io>
AuthorDate: Thu May 30 17:30:48 2019 -0700

    GEODE-6821: Shared P2P reader no longer processes messages on regions with a serial sender inline
---
 ...ListenersDifferentPrimariesDistributedTest.java | 317 +++++++++++++++++++++
 .../distributed/internal/DistributionMessage.java  |   4 +-
 .../internal/cache/AbstractUpdateOperation.java    |   4 +-
 .../apache/geode/internal/cache/BucketRegion.java  |   4 +-
 .../geode/internal/cache/DestroyOperation.java     |   4 +-
 .../internal/cache/DistributedCacheOperation.java  |   4 +-
 .../apache/geode/internal/cache/LocalRegion.java   |  10 +-
 .../internal/cache/partitioned/DestroyMessage.java |   2 +-
 .../cache/partitioned/PartitionMessage.java        |   2 +-
 .../cache/partitioned/PutAllPRMessage.java         |   2 +-
 .../internal/cache/partitioned/PutMessage.java     |   2 +-
 .../cache/partitioned/RemoveAllPRMessage.java      |   2 +-
 .../internal/cache/CacheOperationMessageTest.java  |   4 +-
 13 files changed, 337 insertions(+), 24 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/SerialAsyncEventListenersDifferentPrimariesDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/SerialAsyncEventListenersDifferentPrimariesDistributedTest.java
new file mode 100644
index 0000000..fecf7ce
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/wan/asyncqueue/SerialAsyncEventListenersDifferentPrimariesDistributedTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.asyncqueue;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Stream;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.ForcedDisconnectException;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.EntryNotFoundException;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.asyncqueue.AsyncEvent;
+import org.apache.geode.cache.asyncqueue.AsyncEventListener;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.internal.OSProcess;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.AEQTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+@Category(AEQTest.class)
+@SuppressWarnings("serial")
+@RunWith(JUnitParamsRunner.class)
+public class SerialAsyncEventListenersDifferentPrimariesDistributedTest implements Serializable {
+
+  private MemberVM locator;
+
+  private MemberVM server1;
+
+  private MemberVM server2;
+
+  private MemberVM server3;
+
+  @Rule
+  public ClusterStartupRule clusterRule = new ClusterStartupRule();
+
+  @Rule
+  public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+  @Rule
+  public SerializableTestName testName = new SerializableTestName();
+
+  @Rule
+  public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
+
+  private static final Random RANDOM = new Random();
+
+  private static final int NUM_THREADS = 20;
+
+  private static final long TIME_TO_RUN = 30000; // milliseconds
+
+  private static final long TIME_TO_WAIT = 1; // minutes
+
+  @Test
+  @Parameters({"REPLICATE", "PARTITION", "PARTITION_REDUNDANT"})
+  public void testMultithreadedFunctionExecutionsDoingRegionOperations(RegionShortcut shortcut) {
+    // This is a test for GEODE-6821.
+    //
+    // 3 servers each with:
+    // - a function that performs a random region operation on the input region
+    // - a replicated region on which the function is executed
+    // - two regions each with a serial AEQ (the type of region varies between replicate, partition,
+    // partition_redundant)
+    //
+    // 1 multi-threaded client that repeatedly executes the function for a specified length of time
+    // with random region names and operations.
+    //
+    // This test will deadlock pretty much immediately if the shared P2P reader thread processes any
+    // replication.
+
+    // Start Locator
+    locator = clusterRule.startLocatorVM(0);
+
+    // Start servers
+    int locatorPort = locator.getPort();
+    server1 = clusterRule.startServerVM(1, s -> s.withConnectionToLocator(locatorPort));
+    server2 = clusterRule.startServerVM(2, s -> s.withConnectionToLocator(locatorPort));
+    server3 = clusterRule.startServerVM(3, s -> s.withConnectionToLocator(locatorPort));
+
+    // Register Function in all servers
+    Stream.of(server1, server2, server3).forEach(server -> server
+        .invoke(() -> FunctionService.registerFunction(new RegionOperationsFunction())));
+
+    // Create AEQ1 in all servers with server1 as primary
+    String aeq1Id = testName.getMethodName() + "_1";
+    Stream.of(server1, server2, server3).forEach(
+        server -> server.invoke(() -> createAsyncEventQueue(aeq1Id, new TestAsyncEventListener())));
+
+    // Create AEQ2 in all servers with server2 as primary
+    String aeq2Id = testName.getMethodName() + "_2";
+    Stream.of(server2, server1, server3).forEach(
+        server -> server.invoke(() -> createAsyncEventQueue(aeq2Id, new TestAsyncEventListener())));
+
+    // Create region the function is executed on in all servers
+    String functionExecutionRegionName = testName.getMethodName() + "_functionExecutor";
+    Stream.of(server1, server2, server3).forEach(
+        server -> server.invoke(() -> createRegion(functionExecutionRegionName, REPLICATE)));
+
+    // Create region attached to AEQ1 in all servers
+    String aeg1RegionName = testName.getMethodName() + "_1";
+    Stream.of(server1, server2, server3)
+        .forEach(server -> server.invoke(() -> createRegion(aeg1RegionName, REPLICATE, aeq1Id)));
+
+    // Create region attached to AEQ2 in all servers
+    String aeg2RegionName = testName.getMethodName() + "_2";
+    Stream.of(server1, server2, server3)
+        .forEach(server -> server.invoke(() -> createRegion(aeg2RegionName, REPLICATE, aeq2Id)));
+
+    // Create Client cache and proxy function region
+    createClientCacheAndRegion(locatorPort, functionExecutionRegionName);
+
+    // Launch threads to execute the Function from multiple threads in multiple members doing
+    // multiple operations against multiple regions
+    List<CompletableFuture<Void>> futures = launchTimedFunctionExecutionThreads(
+        functionExecutionRegionName, aeg1RegionName, aeg2RegionName, NUM_THREADS, TIME_TO_RUN);
+
+    // Wait for futures to complete. If they complete, the test is successful. If they timeout, the
+    // test is unsuccessful.
+    waitForFuturesToComplete(futures);
+  }
+
+  private void createAsyncEventQueue(String asyncEventQueueId,
+      AsyncEventListener asyncEventListener) {
+    ClusterStartupRule.getCache().createAsyncEventQueueFactory().setDispatcherThreads(1)
+        .setParallel(false).create(asyncEventQueueId, asyncEventListener);
+  }
+
+  private void createRegion(String regionName, RegionShortcut shortcut) {
+    createRegion(regionName, shortcut, null);
+  }
+
+  private void createRegion(String regionName, RegionShortcut shortcut, String asyncEventQueueId) {
+    RegionFactory factory = ClusterStartupRule.getCache().createRegionFactory(shortcut);
+    if (asyncEventQueueId != null) {
+      factory.addAsyncEventQueueId(asyncEventQueueId);
+    }
+    factory.create(regionName);
+  }
+
+  private void createClientCacheAndRegion(int port, String regionName) {
+    ClientCacheFactory clientCacheFactory =
+        new ClientCacheFactory().addPoolLocator(getHostName(), port).setPoolRetryAttempts(0);
+    clientCacheRule.createClientCache(clientCacheFactory);
+    clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
+        .create(regionName);
+  }
+
+  private List<CompletableFuture<Void>> launchTimedFunctionExecutionThreads(
+      String functionExecutionRegionName, String aeg1RegionName, String aeg2RegionName,
+      int numThreads, long timeToRun) {
+    String[] possibleOperationRegionNames = new String[] {aeg1RegionName, aeg2RegionName};
+    String[] possibleOperations = new String[] {"put", "putAll", "destroy", "removeAll"};
+    List<CompletableFuture<Void>> futures = new ArrayList<>();
+    for (int i = 0; i < numThreads; i++) {
+      futures.add(executorServiceRule.runAsync(
+          () -> executeFunctionTimed(RegionOperationsFunction.class.getSimpleName(),
+              functionExecutionRegionName, possibleOperationRegionNames, possibleOperations,
+              timeToRun)));
+    }
+    return futures;
+  }
+
+  private void executeFunctionTimed(String functionId, String regionName,
+      String[] possibleOperationRegionNames, String[] possibleOperations, long timeToRun) {
+    long endRun = System.currentTimeMillis() + timeToRun;
+    while (System.currentTimeMillis() < endRun) {
+      String operationRegionName =
+          possibleOperationRegionNames[RANDOM.nextInt(possibleOperationRegionNames.length)];
+      String operation = possibleOperations[RANDOM.nextInt(possibleOperations.length)];
+      FunctionService.onRegion(clientCacheRule.getClientCache().getRegion(regionName))
+          .setArguments(new String[] {operationRegionName, operation}).execute(functionId)
+          .getResult();
+    }
+  }
+
+  private void waitForFuturesToComplete(List<CompletableFuture<Void>> futures) {
+    try {
+      CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()])).get(
+          TIME_TO_WAIT,
+          MINUTES);
+    } catch (Exception e) {
+      // An exception occurred. Since this could be a deadlock, dump the stacks and kill the
+      // servers.
+      addIgnoredException("Possible loss of quorum");
+      addIgnoredException(ForcedDisconnectException.class);
+      server1.invoke(() -> dumpStacks());
+      server2.invoke(() -> dumpStacks());
+      server3.invoke(() -> dumpStacks());
+      clusterRule.crashVM(1);
+      clusterRule.crashVM(2);
+      clusterRule.crashVM(3);
+      fail(e.toString());
+    }
+  }
+
+  private void dumpStacks() {
+    OSProcess.printStacks(0);
+  }
+
+  public static class RegionOperationsFunction implements Function {
+
+    private final Cache cache;
+
+    private static final int NUM_ENTRIES = 10000;
+
+    public RegionOperationsFunction() {
+      this.cache = CacheFactory.getAnyInstance();
+    }
+
+    @Override
+    public void execute(FunctionContext context) {
+      String[] args = (String[]) context.getArguments();
+      String regionName = args[0];
+      String operation = args[1];
+      Region region = this.cache.getRegion(regionName);
+      switch (operation) {
+        case "put":
+          doPut(region);
+          break;
+        case "putAll":
+          doPutAll(region);
+          break;
+        case "destroy":
+          doDestroy(region);
+          break;
+        case "removeAll":
+          doRemoveAll(region);
+          break;
+      }
+      context.getResultSender().lastResult(true);
+    }
+
+    private void doPut(Region region) {
+      region.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+    }
+
+    private void doPutAll(Region region) {
+      Map events = new HashMap();
+      events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+      events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+      events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+      events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+      events.put(RANDOM.nextInt(NUM_ENTRIES), "value");
+      region.putAll(events);
+    }
+
+    private void doDestroy(Region region) {
+      try {
+        region.destroy(RANDOM.nextInt(NUM_ENTRIES));
+      } catch (EntryNotFoundException e) {
+      }
+    }
+
+    private void doRemoveAll(Region region) {
+      List keys = new ArrayList();
+      keys.add(RANDOM.nextInt(NUM_ENTRIES));
+      keys.add(RANDOM.nextInt(NUM_ENTRIES));
+      keys.add(RANDOM.nextInt(NUM_ENTRIES));
+      keys.add(RANDOM.nextInt(NUM_ENTRIES));
+      keys.add(RANDOM.nextInt(NUM_ENTRIES));
+      region.removeAll(keys);
+    }
+
+    @Override
+    public String getId() {
+      return getClass().getSimpleName();
+    }
+  }
+
+  private static class TestAsyncEventListener implements AsyncEventListener {
+
+    @Override
+    public boolean processEvents(List<AsyncEvent> events) {
+      return true;
+    }
+  }
+}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
index 2485bd0..ef897e8 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionMessage.java
@@ -413,7 +413,7 @@ public abstract class DistributionMessage implements DataSerializableFixedID, Cl
       // If processing this message may need to add
       // to more than one serial gateway then don't
       // do it inline.
-      if (mayAddToMultipleSerialGateways(dm)) {
+      if (mayAddToSerialGateway(dm)) {
         inlineProcess = false;
       }
     }
@@ -465,7 +465,7 @@ public abstract class DistributionMessage implements DataSerializableFixedID, Cl
     } // not inline
   }
 
-  protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+  protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
     // subclasses should override this method if processing
     // them may add to multiple serial gateways.
     return false;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
index 639229a..fcf4ab0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractUpdateOperation.java
@@ -327,8 +327,8 @@ public abstract class AbstractUpdateOperation extends DistributedCacheOperation
     }
 
     @Override
-    protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
-      return _mayAddToMultipleSerialGateways(dm);
+    protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
+      return _mayAddToSerialGateway(dm);
     }
   }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index 4853090..cce7f8d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -2367,8 +2367,8 @@ public class BucketRegion extends DistributedRegion implements Bucket {
   }
 
   @Override
-  public boolean notifiesMultipleSerialGateways() {
-    return getPartitionedRegion().notifiesMultipleSerialGateways();
+  public boolean notifiesSerialGateway() {
+    return getPartitionedRegion().notifiesSerialGateway();
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
index 8b3649d..4741161 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DestroyOperation.java
@@ -206,8 +206,8 @@ public class DestroyOperation extends DistributedCacheOperation {
     }
 
     @Override
-    protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
-      return _mayAddToMultipleSerialGateways(dm);
+    protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
+      return _mayAddToSerialGateway(dm);
     }
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
index 1699a7b..93f6c81 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedCacheOperation.java
@@ -1529,14 +1529,14 @@ public abstract class DistributedCacheOperation {
       this.hasOldValue = true;
     }
 
-    protected boolean _mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+    protected boolean _mayAddToSerialGateway(ClusterDistributionManager dm) {
       int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT);
       try {
         LocalRegion lr = getLocalRegionForProcessing(dm);
         if (lr == null) {
           return false;
         }
-        return lr.notifiesMultipleSerialGateways();
+        return lr.notifiesSerialGateway();
       } catch (RuntimeException ignore) {
         return false;
       } finally {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 4411557..2239f62 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -6077,9 +6077,9 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
   }
 
   /**
-   * Returns true if this region notifies multiple serial gateways.
+   * Returns true if this region notifies any serial gateways.
    */
-  public boolean notifiesMultipleSerialGateways() {
+  public boolean notifiesSerialGateway() {
     if (isPdxTypesRegion()) {
       return false;
     }
@@ -6087,14 +6087,10 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
     if (!allGatewaySenderIds.isEmpty()) {
       List<Integer> allRemoteDSIds = getRemoteDsIds(allGatewaySenderIds);
       if (allRemoteDSIds != null) {
-        int serialGatewayCount = 0;
         for (GatewaySender sender : getCache().getAllGatewaySenders()) {
           if (allGatewaySenderIds.contains(sender.getId())) {
             if (!sender.isParallel()) {
-              serialGatewayCount++;
-              if (serialGatewayCount > 1) {
-                return true;
-              }
+              return true;
             }
           }
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
index 79ba365..2cfc877 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/DestroyMessage.java
@@ -457,7 +457,7 @@ public class DestroyMessage extends PartitionMessageWithDirectReply {
   }
 
   @Override
-  protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+  protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
     return _mayAddToMultipleSerialGateways(dm);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
index 3ef1b09..28bf024 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PartitionMessage.java
@@ -710,7 +710,7 @@ public abstract class PartitionMessage extends DistributionMessage
       if (pr == null) {
         return false;
       }
-      return pr.notifiesMultipleSerialGateways();
+      return pr.notifiesSerialGateway();
     } catch (PRLocallyDestroyedException ignore) {
       return false;
     } catch (RuntimeException ignore) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
index a0d982c..3a58e41 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
@@ -687,7 +687,7 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
   }
 
   @Override
-  protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+  protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
     return _mayAddToMultipleSerialGateways(dm);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java
index 5c64725..20c29da 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutMessage.java
@@ -823,7 +823,7 @@ public class PutMessage extends PartitionMessageWithDirectReply implements NewVa
   }
 
   @Override
-  protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+  protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
     return _mayAddToMultipleSerialGateways(dm);
   }
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index 776eb7d..bb0ddeb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -680,7 +680,7 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
   }
 
   @Override
-  protected boolean mayAddToMultipleSerialGateways(ClusterDistributionManager dm) {
+  protected boolean mayAddToSerialGateway(ClusterDistributionManager dm) {
     return _mayAddToMultipleSerialGateways(dm);
   }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
index 7beced8..a70fa8c 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/CacheOperationMessageTest.java
@@ -34,7 +34,7 @@ public class CacheOperationMessageTest {
     ClusterDistributionManager mockDistributionManager = mock(ClusterDistributionManager.class);
 
     when(mockCacheOperationMessage.supportsDirectAck()).thenReturn(true);
-    when(mockCacheOperationMessage._mayAddToMultipleSerialGateways(eq(mockDistributionManager)))
+    when(mockCacheOperationMessage._mayAddToSerialGateway(eq(mockDistributionManager)))
         .thenReturn(true);
 
     mockCacheOperationMessage.process(mockDistributionManager);
@@ -42,7 +42,7 @@ public class CacheOperationMessageTest {
     verify(mockCacheOperationMessage, times(1)).process(mockDistributionManager);
 
     assertThat(mockCacheOperationMessage.supportsDirectAck()).isTrue();
-    assertThat(mockCacheOperationMessage._mayAddToMultipleSerialGateways(mockDistributionManager))
+    assertThat(mockCacheOperationMessage._mayAddToSerialGateway(mockDistributionManager))
         .isTrue();
   }
 }