You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2021/03/30 14:58:26 UTC

[GitHub] [geode] albertogpz commented on a change in pull request #5630: GEODE-8605: Alter Gateway Sender command

albertogpz commented on a change in pull request #5630:
URL: https://github.com/apache/geode/pull/5630#discussion_r604136641



##########
File path: geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
##########
@@ -2689,6 +2689,12 @@ public void setOverflowDirectory(String value) {
       return this.gatewayEventFilters;
     }
 
+    public boolean areGatewayEventFiltersUpdated() {
+      if (gatewayEventFilters == null)
+        return false;
+      return true;

Review comment:
       How about something more compact? return gatewayEventFilters != null;
   

##########
File path: geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
##########
@@ -31,63 +31,201 @@
   public static final boolean DEFAULT_IS_META_QUEUE = false;
 
 
-  public int socketBufferSize = GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE;
+  private int socketBufferSize = GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE;
 
-  public int socketReadTimeout = GatewaySender.DEFAULT_SOCKET_READ_TIMEOUT;
+  private int socketReadTimeout = GatewaySender.DEFAULT_SOCKET_READ_TIMEOUT;
 
-  public int maximumQueueMemory = GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY;
+  private int maximumQueueMemory = GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY;
 
-  public int batchSize = GatewaySender.DEFAULT_BATCH_SIZE;
+  private int batchSize = GatewaySender.DEFAULT_BATCH_SIZE;
 
-  public int batchTimeInterval = GatewaySender.DEFAULT_BATCH_TIME_INTERVAL;
+  private int batchTimeInterval = GatewaySender.DEFAULT_BATCH_TIME_INTERVAL;
 
-  public boolean isBatchConflationEnabled = GatewaySender.DEFAULT_BATCH_CONFLATION;
+  private boolean isBatchConflationEnabled = GatewaySender.DEFAULT_BATCH_CONFLATION;
 
-  public boolean isPersistenceEnabled = GatewaySender.DEFAULT_PERSISTENCE_ENABLED;
+  private boolean isPersistenceEnabled = GatewaySender.DEFAULT_PERSISTENCE_ENABLED;
 
-  public int alertThreshold = GatewaySender.DEFAULT_ALERT_THRESHOLD;
+  private int alertThreshold = GatewaySender.DEFAULT_ALERT_THRESHOLD;
 
-  public boolean manualStart = GatewaySender.DEFAULT_MANUAL_START;
+  private boolean manualStart = GatewaySender.DEFAULT_MANUAL_START;
 
-  public String diskStoreName;
+  private String diskStoreName;
 
-  public List<GatewayEventFilter> eventFilters = new ArrayList<GatewayEventFilter>();
+  private List<GatewayEventFilter> eventFilters = new ArrayList<GatewayEventFilter>();
 
-  public ArrayList<GatewayTransportFilter> transFilters = new ArrayList<GatewayTransportFilter>();
+  private ArrayList<GatewayTransportFilter> transFilters = new ArrayList<GatewayTransportFilter>();
 
-  public List<AsyncEventListener> listeners = new ArrayList<AsyncEventListener>();
+  private List<AsyncEventListener> listeners = new ArrayList<AsyncEventListener>();
 
-  public GatewayEventSubstitutionFilter eventSubstitutionFilter;
+  private GatewayEventSubstitutionFilter eventSubstitutionFilter;
 
-  public String id;
+  private String id;
 
-  public int remoteDs = GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID;
+  private int remoteDs = GatewaySender.DEFAULT_DISTRIBUTED_SYSTEM_ID;
 
-  public LocatorDiscoveryCallback locatorDiscoveryCallback;
+  private LocatorDiscoveryCallback locatorDiscoveryCallback;
 
-  public boolean isDiskSynchronous = GatewaySender.DEFAULT_DISK_SYNCHRONOUS;
+  private boolean isDiskSynchronous = GatewaySender.DEFAULT_DISK_SYNCHRONOUS;
 
-  public OrderPolicy policy;
+  private OrderPolicy policy;
 
-  public int dispatcherThreads = GatewaySender.DEFAULT_DISPATCHER_THREADS;
+  private int dispatcherThreads = GatewaySender.DEFAULT_DISPATCHER_THREADS;
 
-  public int parallelism = GatewaySender.DEFAULT_PARALLELISM_REPLICATED_REGION;
+  private int parallelism = GatewaySender.DEFAULT_PARALLELISM_REPLICATED_REGION;
 
-  public boolean isParallel = GatewaySender.DEFAULT_IS_PARALLEL;
+  private boolean isParallel = GatewaySender.DEFAULT_IS_PARALLEL;
 
-  public boolean groupTransactionEvents = GatewaySender.DEFAULT_MUST_GROUP_TRANSACTION_EVENTS;
+  private boolean groupTransactionEvents = GatewaySender.DEFAULT_MUST_GROUP_TRANSACTION_EVENTS;
 
-  public boolean isForInternalUse = GatewaySender.DEFAULT_IS_FOR_INTERNAL_USE;
+  private boolean isForInternalUse = GatewaySender.DEFAULT_IS_FOR_INTERNAL_USE;
 
-  public boolean isBucketSorted = GatewaySenderAttributes.DEFAULT_IS_BUCKETSORTED;
+  private boolean isBucketSorted = GatewaySenderAttributes.DEFAULT_IS_BUCKETSORTED;
 
-  public boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE;
+  private boolean isMetaQueue = GatewaySenderAttributes.DEFAULT_IS_META_QUEUE;
 
-  public boolean forwardExpirationDestroy = GatewaySender.DEFAULT_FORWARD_EXPIRATION_DESTROY;
+  private boolean forwardExpirationDestroy = GatewaySender.DEFAULT_FORWARD_EXPIRATION_DESTROY;
 
-  public boolean enforceThreadsConnectSameReceiver =
+  private boolean enforceThreadsConnectSameReceiver =
       GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER;
 
+  // Added due to "alter gateway-sender" command
+
+  private boolean modifyAlertThreshold = false;
+
+  private boolean modifyBatchSize = false;
+
+  private boolean modifyBatchTimeInterval = false;
+
+  private boolean modifyGroupTransactionEvents = false;
+
+  private boolean modifyGatewayEventFilter = false;
+
+  private boolean modifyGatewayTransportFilter = false;

Review comment:
       What is the purpose of these members and the functions to get/set them? I have not found the use in the rest of the code.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderAlterOperationsDUnitTest.java
##########
@@ -0,0 +1,885 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.serial;
+
+import static java.lang.System.currentTimeMillis;
+import static java.util.Arrays.asList;
+import static org.apache.geode.cache.Region.SEPARATOR;
+import static org.apache.geode.cache.RegionShortcut.REPLICATE;
+import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_ORDER_POLICY;
+import static org.apache.geode.cache.wan.GatewaySender.DEFAULT_SOCKET_BUFFER_SIZE;
+import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
+import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
+import static org.apache.geode.test.dunit.VM.getCurrentVMNum;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.apache.geode.test.dunit.VM.toArray;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Parameterized.UseParametersRunnerFactory;
+
+import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStore;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionDestroyedException;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.wan.GatewayEventFilter;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.cache.wan.GatewayReceiverFactory;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
+import org.apache.geode.distributed.Locator;
+import org.apache.geode.distributed.internal.InternalLocator;
+import org.apache.geode.internal.cache.ForceReattemptException;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
+import org.apache.geode.internal.cache.wan.GatewaySenderException;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.cache.wan.InternalGatewaySenderFactory;
+import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.AsyncInvocation;
+import org.apache.geode.test.dunit.IgnoredException;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.cache.CacheTestCase;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder;
+import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory;
+
+@Category(WanTest.class)
+@RunWith(Parameterized.class)
+@UseParametersRunnerFactory(CategoryWithParameterizedRunnerFactory.class)
+@SuppressWarnings("serial")
+public class SerialGatewaySenderAlterOperationsDUnitTest extends CacheTestCase {
+  private static final Logger logger = LogService.getLogger();
+
+  @Parameters(name = "{index}: numDispatchers={0}")
+  public static Collection<Integer> data() {
+    return asList(1, 3, 5);
+  }
+
+  @Parameter
+  public int numDispatchers;
+
+  private VM vm0;
+  private VM vm1;
+  private VM vm2;
+  private VM vm3;
+  private VM vm4;
+  private VM vm5;
+  private VM vm6;
+  private VM vm7;
+
+  private String className;
+
+  @Rule
+  public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder();
+
+  @Before
+  public void setUp() {
+    addIgnoredException("Broken pipe");
+    addIgnoredException("Connection refused");
+    addIgnoredException("Connection reset");
+    addIgnoredException("could not get remote locator information");
+    addIgnoredException("Software caused connection abort");
+    addIgnoredException("Unexpected IOException");
+
+    className = getClass().getSimpleName();
+
+    vm0 = getVM(0);
+    vm1 = getVM(1);
+    vm2 = getVM(2);
+    vm3 = getVM(3);
+    vm4 = getVM(4);
+    vm5 = getVM(5);
+    vm6 = getVM(6);
+    vm7 = getVM(7);
+
+    // Stopping the gateway closed the region, which causes this exception to get logged
+    addIgnoredException(RegionDestroyedException.class);
+  }
+
+  @Test
+  public void testStartPauseResumeSerialGatewaySenderUpdateAttributes() throws Exception {
+    int lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    int nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+    for (VM vm : toArray(vm2, vm3)) {
+      vm.invoke(() -> {
+        createCache(nyPort);
+        createReceiver();
+      });
+    }
+
+    vm4.invoke(() -> createCache(lnPort));
+    vm5.invoke(() -> createCache(lnPort));
+    vm6.invoke(() -> createCache(lnPort));
+    vm7.invoke(() -> createCache(lnPort));
+
+    vm4.invoke(() -> createSenderInVm4());
+    vm5.invoke(() -> createSenderInVm5());
+
+    vm2.invoke(() -> createReplicatedRegion(className + "_RR", null));
+    vm3.invoke(() -> createReplicatedRegion(className + "_RR", null));
+
+    vm4.invoke(() -> createReplicatedRegion(className + "_RR", "ln"));
+    vm5.invoke(() -> createReplicatedRegion(className + "_RR", "ln"));
+    vm6.invoke(() -> createReplicatedRegion(className + "_RR", "ln"));
+    vm7.invoke(() -> createReplicatedRegion(className + "_RR", "ln"));
+
+    for (VM vm : toArray(vm4, vm5)) {
+      vm.invoke(() -> startSender("ln"));
+    }
+
+    vm4.invoke(() -> pauseSender("ln"));
+    vm5.invoke(() -> pauseSender("ln"));
+
+    vm4.invoke(() -> validateSenderPausedState("ln"));
+    vm5.invoke(() -> validateSenderPausedState("ln"));
+
+    vm4.invoke(() -> doPuts(className + "_RR", 1000));
+
+    updateBatchSize(50);
+    updateBatchTimeInterval(200);
+
+    vm4.invoke(() -> resumeSender("ln"));
+    vm5.invoke(() -> resumeSender("ln"));
+
+    vm4.invoke(() -> validateSenderResumedState("ln"));
+    vm5.invoke(() -> validateSenderResumedState("ln"));
+
+    checkBatchSize(50);
+    checkBatchTimeInterval(200);
+
+    vm4.invoke(() -> validateQueueContents("ln", 0));
+    vm5.invoke(() -> validateQueueContents("ln", 0));
+
+    vm2.invoke(() -> validateRegionSize(className + "_RR", 1000));
+    vm3.invoke(() -> validateRegionSize(className + "_RR", 1000));
+  }
+
+  @Test
+  public void testSerialGatewaySenderUpdateAttributesWhilePutting() throws Exception {
+    int lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    int nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+    for (VM vm : toArray(vm2, vm3)) {
+      vm.invoke(() -> {
+        createCache(nyPort);
+        createReceiver();
+      });
+    }
+
+    vm4.invoke(() -> createCache(lnPort));
+    vm5.invoke(() -> createCache(lnPort));
+    vm6.invoke(() -> createCache(lnPort));
+    vm7.invoke(() -> createCache(lnPort));
+
+    vm4.invoke(() -> createSenderInVm4());
+    vm5.invoke(() -> createSenderInVm5());
+
+    vm2.invoke(() -> createReplicatedRegion(className + "_RR", null));
+    vm3.invoke(() -> createReplicatedRegion(className + "_RR", null));
+
+    vm4.invoke(() -> createReplicatedRegion(className + "_RR", "ln"));
+    vm5.invoke(() -> createReplicatedRegion(className + "_RR", "ln"));
+    vm6.invoke(() -> createReplicatedRegion(className + "_RR", "ln"));
+    vm7.invoke(() -> createReplicatedRegion(className + "_RR", "ln"));
+
+    for (VM vm : toArray(vm4, vm5)) {
+      vm.invoke(() -> startSender("ln"));
+    }
+
+    // Do some puts from both vm4 and vm5 while restarting a sender
+    AsyncInvocation doPutsInVm4 =
+        vm4.invokeAsync(() -> doPuts(className + "_RR", 1000));
+
+    updateBatchSize(50);
+    updateBatchTimeInterval(200);
+
+    doPutsInVm4.await();
+
+    checkBatchSize(50);
+    checkBatchTimeInterval(200);
+
+    vm4.invoke(() -> validateQueueContents("ln", 0));
+    vm5.invoke(() -> validateQueueContents("ln", 0));
+
+    vm2.invoke(() -> validateRegionSize(className + "_RR", 1000));
+    vm3.invoke(() -> validateRegionSize(className + "_RR", 1000));
+  }
+
+  @Test
+  public void testSerialGatewaySenderUpdateGatewayEventFiltersWhilePutting() throws Exception {
+    int lnPort = vm0.invoke(() -> createFirstLocatorWithDSId(1));
+    int nyPort = vm1.invoke(() -> createFirstRemoteLocator(2, lnPort));
+
+    List<GatewayEventFilter> filters = new ArrayList<>();
+    filters.add(new MyGatewayEventFilter_AfterAck());
+    filters.add(new PDXGatewayEventFilter());
+
+    for (VM vm : toArray(vm2, vm3)) {
+      vm.invoke(() -> {
+        createCache(nyPort);
+        createReceiver();
+      });
+    }
+
+    vm4.invoke(() -> createCache(lnPort));
+    vm5.invoke(() -> createCache(lnPort));
+    vm6.invoke(() -> createCache(lnPort));
+    vm7.invoke(() -> createCache(lnPort));
+
+    vm4.invoke(() -> createSenderInVm4());
+    vm5.invoke(() -> createSenderInVm5());
+
+    vm2.invoke(() -> createReplicatedRegion(className + "_RR", null));
+    vm3.invoke(() -> createReplicatedRegion(className + "_RR", null));
+
+    vm4.invoke(() -> createReplicatedRegion(className + "_RR", "ln"));
+    vm5.invoke(() -> createReplicatedRegion(className + "_RR", "ln"));
+    vm6.invoke(() -> createReplicatedRegion(className + "_RR", "ln"));
+    vm7.invoke(() -> createReplicatedRegion(className + "_RR", "ln"));
+
+    for (VM vm : toArray(vm4, vm5)) {
+      vm.invoke(() -> startSender("ln"));
+    }
+
+    // Do some puts from both vm4 and vm5 while restarting a sender
+    AsyncInvocation doPutsInVm4 =
+        vm4.invokeAsync(() -> doPuts(className + "_RR", 5000));
+
+    updateBatchSize(40);
+    updateGatewayEventFilters(filters);
+
+    doPutsInVm4.await();
+
+    checkBatchSize(40);
+
+    vm4.invoke(() -> validateQueueContents("ln", 0));
+    vm5.invoke(() -> validateQueueContents("ln", 0));
+
+    vm2.invoke(() -> validateRegionSize(className + "_RR", 5000));
+    vm3.invoke(() -> validateRegionSize(className + "_RR", 5000));
+  }
+
+  protected boolean isOffHeap() {
+    return false;
+  }
+
+  protected void createSenderInVm4() throws IOException {
+    createSender("ln", 2, true, true, numDispatchers, DEFAULT_ORDER_POLICY);
+  }
+
+  protected void createSenderInVm5() throws IOException {
+    createSender("ln", 2, true, true, numDispatchers, DEFAULT_ORDER_POLICY);
+  }
+
+  protected final void createSender(String id,
+      int remoteDsId,
+      boolean isPersistent,
+      boolean isManualStart,
+      int numDispatchers,
+      OrderPolicy policy) throws IOException {
+    try (IgnoredException ie = addIgnoredException("Could not connect")) {
+      File persistentDirectory =
+          temporaryFolder.newFolder(id + "_disk_" + currentTimeMillis() + "_" + getCurrentVMNum());
+      DiskStoreFactory diskStoreFactory = getCache().createDiskStoreFactory();
+      File[] dirs = new File[] {persistentDirectory};
+
+      InternalGatewaySenderFactory gatewaySenderFactory =
+          (InternalGatewaySenderFactory) getCache().createGatewaySenderFactory();
+
+      gatewaySenderFactory.setParallel(false);
+      gatewaySenderFactory.setMaximumQueueMemory(100);
+      gatewaySenderFactory.setBatchSize(10);
+      gatewaySenderFactory.setBatchConflationEnabled(false);
+      gatewaySenderFactory.setManualStart(isManualStart);
+      gatewaySenderFactory.setDispatcherThreads(numDispatchers);
+      gatewaySenderFactory.setOrderPolicy(policy);
+      gatewaySenderFactory.setSocketBufferSize(DEFAULT_SOCKET_BUFFER_SIZE);
+
+      if (isPersistent) {
+        gatewaySenderFactory.setPersistenceEnabled(true);
+        gatewaySenderFactory.setDiskStoreName(
+            diskStoreFactory.setDiskDirs(dirs).create(id).getName());
+      } else {
+        DiskStore store = diskStoreFactory.setDiskDirs(dirs).create(id);
+        gatewaySenderFactory.setDiskStoreName(store.getName());
+      }
+
+      gatewaySenderFactory.create(id, remoteDsId);
+    }
+  }
+
+  private Properties getDistributedSystemProperties(int locatorPort) {
+    Properties props = getDistributedSystemProperties();
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
+    return props;
+  }
+
+  private void createCache(int locatorPort) {
+    getCache(getDistributedSystemProperties(locatorPort));
+  }
+
+  private void createReplicatedRegion(String regionName, String senderIds) {
+    try (IgnoredException ie1 = addIgnoredException(ForceReattemptException.class);
+        IgnoredException ie2 = addIgnoredException(GatewaySenderException.class);
+        IgnoredException ie3 = addIgnoredException(InterruptedException.class)) {
+      RegionFactory regionFactory = getCache().createRegionFactory(REPLICATE);
+
+      if (senderIds != null) {
+        StringTokenizer tokenizer = new StringTokenizer(senderIds, ",");
+        while (tokenizer.hasMoreTokens()) {
+          String senderId = tokenizer.nextToken();
+          regionFactory.addGatewaySenderId(senderId);
+        }
+      }
+
+      regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+      regionFactory.setOffHeap(isOffHeap());
+
+      regionFactory.create(regionName);
+    }
+  }
+
+  private void doPuts(String regionName, int count) {
+    try (IgnoredException ie1 = addIgnoredException(GatewaySenderException.class);
+        IgnoredException ie2 = addIgnoredException(InterruptedException.class)) {
+      Region<Number, String> region = getCache().getRegion(SEPARATOR + regionName);
+      for (int i = 0; i < count; i++) {
+        region.put(i, "Value_" + i);
+      }
+    }
+  }
+
+  private void doPuts(String regionName, int from, int count) {
+    Region<Number, String> region = getCache().getRegion(SEPARATOR + regionName);
+    for (int i = from; i < count; i++) {
+      region.put(i, "Value_" + i);
+    }
+  }
+
+  private int createFirstLocatorWithDSId(int systemId) {
+    stopOldLocator();
+    int locatorPort = getRandomAvailableTCPPort();
+    startLocator(systemId, locatorPort, locatorPort, -1, true);
+    return locatorPort;
+  }
+
+  private int createFirstRemoteLocator(int systemId, int remoteLocatorPort) {
+    stopOldLocator();
+    int locatorPort = getRandomAvailableTCPPort();
+    startLocator(systemId, locatorPort, locatorPort, remoteLocatorPort, true);
+    return locatorPort;
+  }
+
+  private void startLocator(int systemId, int locatorPort, int startLocatorPort,
+      int remoteLocatorPort, boolean startServerLocator) {
+    Properties props = getDistributedSystemProperties();
+
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, String.valueOf(systemId));
+    props.setProperty(MCAST_PORT, "0");
+    props.setProperty(LOCATORS, "localhost[" + locatorPort + "]");
+    props.setProperty(START_LOCATOR, "localhost[" + startLocatorPort + "],server="
+        + startServerLocator + ",peer=true,hostname-for-clients=localhost");
+
+    if (remoteLocatorPort != -1) {
+      props.setProperty(REMOTE_LOCATORS, "localhost[" + remoteLocatorPort + "]");
+    }
+
+    // Start start the locator with a LOCATOR_DM_TYPE and not a NORMAL_DM_TYPE
+    System.setProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE, "true");
+    try {
+      getSystem(props);
+    } finally {
+      System.clearProperty(InternalLocator.FORCE_LOCATOR_DM_TYPE);
+    }
+  }
+
+  private void stopOldLocator() {
+    if (Locator.hasLocator()) {
+      Locator.getLocator().stop();
+    }
+  }
+
+  private InternalGatewaySender findInternalGatewaySender(String senderId) {
+    return (InternalGatewaySender) findGatewaySender(senderId, true);
+  }
+
+  private GatewaySender findGatewaySender(String senderId) {
+    return findGatewaySender(senderId, true);
+  }
+
+  private GatewaySender findGatewaySender(String senderId, boolean assertNotNull) {
+    Set<GatewaySender> senders = getCache().getGatewaySenders();
+    GatewaySender sender = null;
+    for (GatewaySender s : senders) {
+      if (s.getId().equals(senderId)) {
+        sender = s;
+        break;
+      }
+    }
+
+    if (assertNotNull) {
+      assertThat(sender).isNotNull();
+    }
+
+    return sender;
+  }
+
+  private void startSender(String senderId) {
+    try (IgnoredException ie1 = addIgnoredException("Could not connect");
+        IgnoredException ie2 = addIgnoredException(ForceReattemptException.class);
+        IgnoredException ie3 = addIgnoredException(InterruptedException.class)) {
+      findGatewaySender(senderId).start();
+    }
+  }
+
+  private void pauseSender(String senderId) {
+    try (IgnoredException ie1 = addIgnoredException("Could not connect");
+        IgnoredException ie2 = addIgnoredException(ForceReattemptException.class)) {
+      InternalGatewaySender sender = findInternalGatewaySender(senderId);
+      sender.pause();
+      sender.getEventProcessor().waitForDispatcherToPause();
+    }
+  }
+
+  private void resumeSender(String senderId) {
+    try (IgnoredException ie1 = addIgnoredException("Could not connect");
+        IgnoredException ie2 = addIgnoredException(ForceReattemptException.class)) {
+      findGatewaySender(senderId).resume();
+    }
+  }
+
+  private void stopSender(String senderId) {
+    try (IgnoredException ie1 = addIgnoredException("Could not connect");
+        IgnoredException ie2 = addIgnoredException(ForceReattemptException.class)) {
+      InternalGatewaySender sender = findInternalGatewaySender(senderId);
+
+      AbstractGatewaySenderEventProcessor eventProcessor = sender.getEventProcessor();
+
+      sender.stop();
+
+      if (eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor) {
+        ConcurrentSerialGatewaySenderEventProcessor concurrentEventProcessor =
+            (ConcurrentSerialGatewaySenderEventProcessor) eventProcessor;
+        Set<RegionQueue> queues = concurrentEventProcessor.getQueues();
+        for (RegionQueue queue : queues) {
+          if (queue instanceof SerialGatewaySenderQueue) {
+            assertThat(((SerialGatewaySenderQueue) queue).isRemovalThreadAlive())
+                .isFalse();
+          }
+        }
+      }
+    }
+  }
+
+  private int createReceiver() throws IOException {
+    int receiverPort = getRandomAvailableTCPPort();
+
+    GatewayReceiverFactory gatewayReceiverFactory = getCache().createGatewayReceiverFactory();
+    gatewayReceiverFactory.setStartPort(receiverPort);
+    gatewayReceiverFactory.setEndPort(receiverPort);
+    gatewayReceiverFactory.setManualStart(true);
+
+    GatewayReceiver receiver = gatewayReceiverFactory.create();
+    receiver.start();
+
+    return receiverPort;
+  }
+
+  private void validateRegionSize(String regionName, int regionSize) {
+    try (IgnoredException ie1 = addIgnoredException(CacheClosedException.class);
+        IgnoredException ie2 = addIgnoredException(ForceReattemptException.class)) {
+      Region region = getCache().getRegion(SEPARATOR + regionName);
+
+      await()
+          .untilAsserted(() -> {
+            assertThat(region.keySet()).hasSize(regionSize);
+          });
+    }
+  }
+
+  private void validateQueueContents(String senderId, int regionSize) {
+    try (IgnoredException ie1 = addIgnoredException(GatewaySenderException.class);
+        IgnoredException ie2 = addIgnoredException(InterruptedException.class)) {
+      InternalGatewaySender sender = findInternalGatewaySender(senderId);
+
+      if (sender.isParallel()) {
+        RegionQueue regionQueue = sender.getQueues().toArray(new RegionQueue[1])[0];
+        await()
+            .untilAsserted(() -> {
+              assertThat(regionQueue.size())
+                  .isEqualTo(regionSize);
+            });
+
+      } else {
+        Set<RegionQueue> queues = sender.getQueues();
+        await()
+            .untilAsserted(() -> {
+              int size = 0;
+              for (RegionQueue queue : queues) {
+                size += queue.size();
+              }
+              assertThat(size)
+                  .isEqualTo(regionSize);
+            });
+      }
+    }
+  }
+
+  private void validateSenderPausedState(String senderId) {
+    GatewaySender sender = findGatewaySender(senderId);
+
+    assertThat(sender.isPaused())
+        .isTrue();
+  }
+
+  private void validateSenderResumedState(String senderId) {
+    GatewaySender sender = findGatewaySender(senderId);
+
+    assertThat(sender.isPaused())
+        .isFalse();
+    assertThat(sender.isRunning())
+        .isTrue();
+  }
+
+  private void updateBatchSize(int batchsize) {
+    vm4.invoke(() -> {
+      AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln");
+      boolean paused = false;
+      if (sender.isRunning() && !sender.isPaused()) {
+        sender.pause();
+        paused = true;

Review comment:
       Same comment as before on the ParallelGateway sender tests. If it is necessary that the gateway sender is paused while parameters are changed, that would have to be documented. Otherwise, I would remove the pause here and in the other parameter changes, below.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
##########
@@ -1023,6 +1238,268 @@ private void createPartitionedRegions(boolean createAccessors) {
     vm3.invoke(() -> createPartitionedRegion(regionName, "ln", 1, 100, isOffHeap()));
   }
 
+  private void updateBatchSize(int batchsize) {
+    vm4.invoke(() -> {
+      AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender("ln");
+      boolean paused = false;
+      if (sender.isRunning() && !sender.isPaused()) {

Review comment:
       Is it necessary that the sender is paused while the parameters are changed? If it is so, it would have to be said in the documentation. Otherwise, I would remove the pause here and in the other calls where a gateway sender parameter is changed.

##########
File path: geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/AlterGatewaySenderCommand.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.Logger;
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+
+import org.apache.geode.cache.configuration.CacheConfig;
+import org.apache.geode.cache.configuration.DeclarableType;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliMetaData;
+import org.apache.geode.management.cli.ConverterHint;
+import org.apache.geode.management.cli.SingleGfshCommand;
+import org.apache.geode.management.internal.cli.functions.AlterGatewaySenderFunction;
+import org.apache.geode.management.internal.cli.functions.GatewaySenderFunctionArgs;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+import org.apache.geode.management.internal.exceptions.EntityNotFoundException;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.management.internal.security.ResourceOperation;
+import org.apache.geode.security.ResourcePermission;
+
+public class AlterGatewaySenderCommand extends SingleGfshCommand {
+  private final AlterGatewaySenderFunction alterGatewaySenderFunction =
+      new AlterGatewaySenderFunction();
+  private static final Logger logger = LogService.getLogger();
+
+  @CliCommand(value = CliStrings.ALTER_GATEWAYSENDER,
+      help = CliStrings.ALTER_GATEWAYSENDER__HELP)
+  @CliMetaData(relatedTopic = CliStrings.TOPIC_GEODE_WAN)
+  @ResourceOperation(resource = ResourcePermission.Resource.CLUSTER,
+      operation = ResourcePermission.Operation.MANAGE, target = ResourcePermission.Target.GATEWAY)
+
+  public ResultModel alterGatewaySender(@CliOption(key = CliStrings.ALTER_GATEWAYSENDER__ID,
+      mandatory = true, optionContext = ConverterHint.GATEWAY_SENDER_ID,
+      help = CliStrings.ALTER_GATEWAYSENDER__ID__HELP) String senderId,
+      @CliOption(key = {CliStrings.GROUP, CliStrings.GROUPS},
+          optionContext = ConverterHint.MEMBERGROUP,
+          help = CliStrings.ALTER_GATEWAYSENDER__GROUP__HELP) String[] onGroup,
+      @CliOption(key = {CliStrings.MEMBER, CliStrings.MEMBERS},
+          optionContext = ConverterHint.MEMBERIDNAME,
+          help = CliStrings.ALTER_GATEWAYSENDER__MEMBER__HELP) String[] onMember,
+      @CliOption(key = CliStrings.ALTER_GATEWAYSENDER__ALERTTHRESHOLD,
+          help = CliStrings.ALTER_GATEWAYSENDER__ALERTTHRESHOLD__HELP) Integer alertThreshold,
+      @CliOption(key = CliStrings.ALTER_GATEWAYSENDER__BATCHSIZE,
+          help = CliStrings.ALTER_GATEWAYSENDER__BATCHSIZE__HELP) Integer batchSize,
+      @CliOption(key = CliStrings.ALTER_GATEWAYSENDER__BATCHTIMEINTERVAL,
+          help = CliStrings.ALTER_GATEWAYSENDER__BATCHTIMEINTERVAL__HELP) Integer batchTimeInterval,
+      @CliOption(key = CliStrings.ALTER_GATEWAYSENDER__GATEWAYEVENTFILTER,
+          specifiedDefaultValue = CliStrings.NULL,
+          help = CliStrings.ALTER_GATEWAYSENDER__GATEWAYEVENTFILTER__HELP) String[] gatewayEventFilters,
+      @CliOption(key = CliStrings.ALTER_GATEWAYSENDER__GROUPTRANSACTIONEVENTS,
+          specifiedDefaultValue = "true",
+          help = CliStrings.ALTER_GATEWAYSENDER__GROUPTRANSACTIONEVENTS__HELP) Boolean groupTransactionEvents)
+      throws EntityNotFoundException {
+
+    // need not check if any running servers has this gateway-sender. A server with this
+    // gateway-sender id
+    // may be shutdown, but we still need to update Cluster Configuration.
+    if (getConfigurationPersistenceService() == null) {
+      return ResultModel.createError("Cluster Configuration Service is not available. "
+          + "Please connect to a locator with running Cluster Configuration Service.");
+    }
+
+    final String id = senderId.trim();
+
+    CacheConfig.GatewaySender oldConfiguration = findGW(id);
+
+    if (oldConfiguration == null) {
+      String message = String.format("Cannot find a gateway sender with id '%s'.", id);
+      throw new EntityNotFoundException(message);
+    }
+
+    if (groupTransactionEvents != null && groupTransactionEvents
+        && !oldConfiguration.mustGroupTransactionEvents()) {
+      if (!oldConfiguration.isParallel() && (oldConfiguration.getDispatcherThreads() == null
+          || Integer.parseInt(oldConfiguration.getDispatcherThreads()) > 1)) {
+        return ResultModel.createError(
+            "alter-gateway-sender cannot be performed for --group-transaction-events attribute if serial sender and dispatcher-threads is greater than 1.");
+      }
+
+      if (oldConfiguration.isEnableBatchConflation()) {
+        return ResultModel.createError(
+            "alter-gateway-sender cannot be performed for --group-transaction-events attribute if batch-conflation is enabled.");
+      }
+    }
+
+    Set<DistributedMember> dsMembers = findMembers(onGroup, onMember);
+
+    if (dsMembers.isEmpty()) {
+      return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
+    }
+
+    CacheConfig.GatewaySender gwConfiguration = new CacheConfig.GatewaySender();
+    gwConfiguration.setId(id);
+
+    boolean modify = false;
+
+    if (alertThreshold != null) {
+      modify = true;
+      gwConfiguration.setAlertThreshold(alertThreshold.toString());
+    }
+
+    if (batchSize != null) {
+      modify = true;
+      gwConfiguration.setBatchSize(batchSize.toString());
+    }
+
+    if (batchTimeInterval != null) {
+      modify = true;
+      gwConfiguration.setBatchTimeInterval(batchTimeInterval.toString());
+    }
+
+    if (groupTransactionEvents != null) {
+      modify = true;
+      gwConfiguration.setGroupTransactionEvents(groupTransactionEvents);
+    }
+
+    if (gatewayEventFilters != null) {
+      modify = true;
+      if (gatewayEventFilters.length == 1
+          && gatewayEventFilters[0].equalsIgnoreCase(CliStrings.NULL)) {
+        gwConfiguration.getGatewayEventFilters();
+      } else {
+        gwConfiguration.getGatewayEventFilters()
+            .addAll((stringsToDeclarableTypes(gatewayEventFilters)));
+      }
+    }
+
+    if (!modify) {
+      return ResultModel.createError(CliStrings.ALTER_GATEWAYSENDER__RELEVANT__OPTION__MESSAGE);
+    }
+
+    GatewaySenderFunctionArgs gatewaySenderFunctionArgs =
+        new GatewaySenderFunctionArgs(gwConfiguration);
+
+    List<CliFunctionResult> gatewaySenderAlterResults =
+        executeAndGetFunctionResult(alterGatewaySenderFunction, gatewaySenderFunctionArgs,
+            dsMembers);
+
+    ResultModel resultModel = ResultModel.createMemberStatusResult(gatewaySenderAlterResults);
+
+    resultModel.setConfigObject(gwConfiguration);
+
+    return resultModel;
+  }
+
+  @Override
+  public boolean updateConfigForGroup(String group, CacheConfig config, Object configObject) {
+    List<CacheConfig.GatewaySender> gwSenders = config.getGatewaySenders();
+    if (gwSenders.isEmpty()) {
+      return false;
+    }
+
+    boolean gwConfigsHaveBeenUpdated = false;
+    CacheConfig.GatewaySender gwConfiguration =
+        ((CacheConfig.GatewaySender) configObject);
+
+    String gwId = gwConfiguration.getId();
+
+    for (CacheConfig.GatewaySender sender : gwSenders) {
+      if (gwId.equals(sender.getId())) {
+        gwConfigsHaveBeenUpdated = true;
+        if (StringUtils.isNotBlank(gwConfiguration.getBatchSize())) {
+          sender.setBatchSize(gwConfiguration.getBatchSize());
+        }
+
+        if (StringUtils.isNotBlank(gwConfiguration.getBatchTimeInterval())) {
+          sender.setBatchTimeInterval(gwConfiguration.getBatchTimeInterval());
+        }
+
+        if (StringUtils.isNotBlank(gwConfiguration.getAlertThreshold())) {
+          sender.setAlertThreshold(gwConfiguration.getAlertThreshold());
+        }
+        if (gwConfiguration.mustGroupTransactionEvents() != null) {
+          sender.setGroupTransactionEvents(gwConfiguration.mustGroupTransactionEvents());
+        }
+
+        if (gwConfiguration.areGatewayEventFiltersUpdated()) {
+          if (!sender.getGatewayEventFilters().isEmpty()) {
+            sender.getGatewayEventFilters().clear();
+          }
+          if (!gwConfiguration.getGatewayEventFilters().isEmpty()) {
+            sender.getGatewayEventFilters().addAll(gwConfiguration.getGatewayEventFilters());
+          }
+        }
+
+      }
+    }
+    return gwConfigsHaveBeenUpdated;
+
+  }
+
+  private CacheConfig.GatewaySender findGW(String gwId) {
+    CacheConfig.GatewaySender gwsender = null;
+    InternalConfigurationPersistenceService ccService =
+        (InternalConfigurationPersistenceService) this.getConfigurationPersistenceService();
+    if (ccService == null) {
+      return null;
+    }
+
+    Set<String> groups = ccService.getGroups();
+    gwsender = null;

Review comment:
       This line is redundant.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org