You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2022/02/17 09:11:36 UTC

[GitHub] [geode] jvarenina opened a new pull request #7378: GEODE-10056: Work in progress

jvarenina opened a new pull request #7378:
URL: https://github.com/apache/geode/pull/7378


   <!-- Thank you for submitting a contribution to Apache Geode. -->
   
   <!-- In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken: 
   -->
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message?
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `develop`)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   - [ ] Does `gradlew build` run cleanly?
   
   - [ ] Have you written or updated unit tests to verify your changes?
   
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   
   <!-- Note:
   Please ensure that once the PR is submitted, check Concourse for build issues and
   submit an update to your PR as soon as possible. If you need help, please send an
   email to dev@geode.apache.org.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jvarenina commented on pull request #7378: GEODE-10056: Improve gateway-receiver load balance

Posted by GitBox <gi...@apache.org>.
jvarenina commented on pull request #7378:
URL: https://github.com/apache/geode/pull/7378#issuecomment-1072272422


   Hi @boglesby,
   
   I also assumed that the same race condition is possible for the client connections, but I haven't tried to reproduce it. Thanks for pointing this out and lots of other valuable information. Also, thank you for the extensive testing you have done.
   
   If we decide to go with this solution, I agree that we should make the load-poll-interval parameter configurable for gateway receivers. Changing it to the lower value would slightly mitigate race condition effects.
   
   The load-balance gateways command is working on server this way:
   - pauses gateway-sender
   - destroys all connections and then rely upon the mechanism used during connection creation (ClientConnectionRequest/Response) to do the better load balancing
   - resume gateway-sender
   
   This command will result again in the burst of connection requests that could hit an issue caused by a race condition. 
   
   Maybe instead of sending load information periodically from the servers, the locator could scrape it (perhaps using CacheServerMXBean) from the servers and apply it simultaneously for all receivers in the locator. The locator could get load when it receives a connection request, and the current connection load is stale (e.g., older than 200 ms), as we don't expect many connections from gateway-senders. This way, the locator would at least have an up-to-date connection load taken at a similar time on all servers. This solution should even catch the change in connection load when the load-balance command destroys all connections.
   
   Maybe, an algorithm that could work this way:
   - Connection request received, check if a connection request is stale (older than new parameter load-update-frequency=200ms)
     - if yes, then try to get connection load from all servers asynchronously
       - if received load from all servers, then apply it in the locator
       - if any get fails, then check profiles again and immediately retry for all servers
     - Use immediately the current load
   - If the connection request is not received, then just periodically get load, e.g., every 5 seconds (load-poll-interval)
   
   Not sure if this makes any sense as I don't know how fast locator can scrape the load. I can create a prototype if you see that this could maybe work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jvarenina edited a comment on pull request #7378: GEODE-10056: Improve gateway-receiver load balance

Posted by GitBox <gi...@apache.org>.
jvarenina edited a comment on pull request #7378:
URL: https://github.com/apache/geode/pull/7378#issuecomment-1048513322


   Hi reviewers,
   
   With this solution, each server will now send CacheServerLoadMessage containing the correct connection load of the gateway-receiver to all locators in the cluster. This action will happen every 5 seconds as configured with the load-poll-interval parameter. Additionally, the coordinator locator will increase the load each time it provides the server location to the remote gateway-sender in ClientConnectionRequest/ClientConnectionResponse. Locator only maintains load temporarily until CacheServerLoadMessage is received. This behavior makes sense as the server tracks connection load more accurately than the locator. Locator only increases connection load based on the received connection requests while server adjusts the connection load each time connection is established and disconnected.
   
   ClientConnectionRequest messages are usually sent to the locator in bursts when the gateway-sender is establishing connections due to traffic. This behavior results in the locator's connection load being way ahead of the server connection load because servers did not establish those connections yet. Suppose during these bursts CacheServerLoadMessage message come to locator carrying low load value for one of the gateway-receivers. In that case, that receiver will be picked more frequently (will have the lowest load), resulting in unbalanced gateway-sender connections. In order for this to have a big impact on load-balancing of sender connections the gateway-receivers must be started with some small delay, so that CacheServerLoadMessages are sent with some delay that is enough to cause imbalance. If CacheServerLoadMessages were sent at the similar time then this would not be a problem as all messages would have similar load and would update locator at similar time.
   
   I would be really grateful if you could share your opinion on this matter? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jvarenina edited a comment on pull request #7378: GEODE-10056: Improve gateway-receiver load balance

Posted by GitBox <gi...@apache.org>.
jvarenina edited a comment on pull request #7378:
URL: https://github.com/apache/geode/pull/7378#issuecomment-1072272422


   Hi @boglesby,
   
   I also assumed that the same race condition is possible for the client connections, but I haven't tried to reproduce it. Thanks for pointing this out and lots of other valuable information. Also, thank you for the extensive testing you have done.
   
   If we decide to go with this solution, I agree that we should make the load-poll-interval parameter configurable for gateway receivers. Changing it to the lower value would slightly mitigate race condition effects.
   
   The load-balance gateways command is working on server this way:
   - pauses gateway-sender
   - destroys all connections and then rely upon the mechanism used during connection creation (ClientConnectionRequest/Response) to do the better load balancing
   - resume gateway-sender
   
   This command will result again in the burst of connection requests that could hit an issue caused by a race condition. 
   
   Maybe instead of sending load information periodically from the servers, the locator could scrape it (perhaps using CacheServerMXBean) from the servers and apply it simultaneously for all receivers in the locator. The locator could get load when it receives a connection request, and the current connection load is stale (e.g., older than 200 ms), as we don't expect many connections from gateway-senders. This way, the locator would at least have an up-to-date connection load taken at a similar time on all servers. This solution should even catch the change in connection load when the load-balance command destroys all connections.
   
   Maybe, an algorithm that could work this way:
   - Connection request received, check if a connection load is stale (older than new parameter load-update-frequency=200ms)
     - if yes, then try to get connection load from all servers asynchronously
       - if received load from all servers, then apply it in the locator
       - if any get fails, then check profiles again and immediately retry for all servers
     - Use immediately the current load
   - If the connection request is not received, then just periodically get load, e.g., every 5 seconds (load-poll-interval)
   
   Not sure if this makes any sense as I don't know how fast locator can scrape the load. I can create a prototype if you see that this could maybe work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jvarenina commented on a change in pull request #7378: GEODE-10056: Improve gateway-receiver load balance

Posted by GitBox <gi...@apache.org>.
jvarenina commented on a change in pull request #7378:
URL: https://github.com/apache/geode/pull/7378#discussion_r820829842



##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locator1Site2;
+  private MemberVM locator1Site1;
+  private MemberVM locator2Site1;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server3Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite2;
+
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+  private static final String REGION_NAME = "test1";
+  private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+  private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;

Review comment:
       It was determined this way:
   
   1) I have added +2 because of possible ping connections. There are two servers with configured gateway-sender, so that means that there could be up to two additional ping connections per gateway-receiver.
   
   2) I have added +1 because there are 20 gateway-sender threads configured, so if you divide 40 (threads)/3(servers) you get difference in number of connections between receivers.
   
   3) I have added +1 just to be on safe side.
   
   I know this is not perfect as there is still possible race condition explained here https://github.com/apache/geode/pull/7378#issuecomment-1048513322  that could possibly cause connection imbalance between receivers. Regardless of this, I still decided to push this solution because these tests reproduce the issue and I was hoping to get some feedback or advice from community before trying to further improve tests and solution e.g. why it is implemented like this in a first place?, is there maybe a better way to solve this issue?, If the solution in this PR is acceptable? and so on...
   
   I really appreciate your feedback on this PR, and I will apply you comments and push the commit as soon as possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jvarenina commented on a change in pull request #7378: GEODE-10056: Improve gateway-receiver load balance

Posted by GitBox <gi...@apache.org>.
jvarenina commented on a change in pull request #7378:
URL: https://github.com/apache/geode/pull/7378#discussion_r820829842



##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locator1Site2;
+  private MemberVM locator1Site1;
+  private MemberVM locator2Site1;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server3Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite2;
+
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+  private static final String REGION_NAME = "test1";
+  private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+  private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;

Review comment:
       It was determined this way:
   
   1) I have added +2 because of possible ping connections. There are two servers with configured gateway-sender, so that means that there could be up to two additional ping connections per gateway-receiver.
   
   2) I have added +1 because there are 20 gateway-sender threads configured, so if you divide 40 (threads)/3(servers) you get difference in number of connections between receivers.
   
   3) I have added +1 just to be on safe side.
   
   I know this is not perfect as there is still possible race condition explained here https://github.com/apache/geode/pull/7378#issuecomment-1048513322  that could possibly cause connection imbalance between receivers. Regardless of this, I still decided to push this solution because these tests reproduce the issue and I hoped to get some feedback and advice from community before trying to further improve tests and solution e.g. why it is implemented like this in a first place?, if there is any better way to solve this issue?, If solution in this PR acceptable? and so on...
   
   I really appreciate your feedback on this PR, and I will apply you comments and push the commit as soon as possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jvarenina edited a comment on pull request #7378: GEODE-10056: Improve gateway-receiver load balance

Posted by GitBox <gi...@apache.org>.
jvarenina edited a comment on pull request #7378:
URL: https://github.com/apache/geode/pull/7378#issuecomment-1072272422


   Hi @boglesby,
   
   I also assumed that the same race condition is possible for the client connections, but I haven't tried to reproduce it. Thanks for pointing this out and lots of other valuable information. Also, thank you for the extensive testing you have done.
   
   If we decide to go with this solution, I agree that we should make the load-poll-interval parameter configurable for gateway receivers. Changing it to the lower value would slightly mitigate race condition effects.
   
   The load-balance gateways command is working on server this way:
   - pauses gateway-sender
   - destroys all connections and then rely upon the mechanism used during connection creation (ClientConnectionRequest/Response) to do the better load balancing
   - resume gateway-sender
   
   This command will result again in the burst of connection requests that could hit an issue caused by a race condition. 
   
   Maybe instead of sending load information periodically from the servers, the locator could scrape it (perhaps using CacheServerMXBean) from the servers and apply it simultaneously for all receivers in the locator. The locator could get load when it receives a connection request, and the current connection load is stale (e.g., older than 200 ms), as we don't expect many connections from gateway-senders. This way, the locator would at least have an up-to-date connection load taken at a similar time on all servers. This solution should even catch the change in connection load when the load-balance command destroys all connections.
   
   Maybe, an algorithm that could work this way:
   - Connection request received, check if a connection load is stale (older than new parameter load-update-frequency=200ms)
     - if yes, then try to get connection load from all servers asynchronously
       - if received load from all servers, then apply it in the locator
       - if any get fails, then check profiles again and immediately retry for all servers
     - Use immediately the current load
   - If the connection request is not received, then just periodically get load, e.g., every 5 seconds (load-poll-interval)
   
   Not sure if this makes any sense as I don't know how fast locator can scrape the load. I can create a prototype if you see that this could maybe work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] boglesby commented on pull request #7378: GEODE-10056: Improve gateway-receiver load balance

Posted by GitBox <gi...@apache.org>.
boglesby commented on pull request #7378:
URL: https://github.com/apache/geode/pull/7378#issuecomment-1077908228


   Thats a pretty cool idea. I'm not sure whether the CacheServerMXBean has that behavior, but I guess it could be added. In any event, I think this change is good. I'm approving this change, but you need to address the ParallelGatewaySenderConnectionLoadBalanceDistributedTest failure.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] boglesby commented on pull request #7378: GEODE-10056: Improve gateway-receiver load balance

Posted by GitBox <gi...@apache.org>.
boglesby commented on pull request #7378:
URL: https://github.com/apache/geode/pull/7378#issuecomment-1071176231


   I ran a few tests with some extra logging on these changes. They look good.
   
   #### The receiver exchanges profiles with the locator:
   ```
   [warn 2022/03/16 14:16:12.440 PDT locator-ln <Pooled High Priority Message Processor 2> tid=0x50] XXX LocatorLoadSnapshot.updateConnectionLoadMap location=192.168.1.5:5370; load=0.0
   
   [warn 2022/03/16 14:16:12.441 PDT locator-ln <Pooled High Priority Message Processor 2> tid=0x50] XXX LocatorLoadSnapshot.updateConnectionLoadMap current load for location=192.168.1.5:5370; group=__recv__group; inputLoad=0.0; currentLoad=0.0
   
   [warn 2022/03/16 14:16:12.441 PDT locator-ln <Pooled High Priority Message Processor 2> tid=0x50] XXX LocatorLoadSnapshot.updateConnectionLoadMap updated load for location=192.168.1.5:5370; group=__recv__group; inputLoad=0.0; newLoad=0.0
   ```
   The connectionLoadMap shows 2 groups, namely the null group (default) and the __recv__group group (gateway receiver), each with load=0.0:
   ```
   [warn 2022/03/16 14:16:13.777 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56224; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5370; load=0.0
   ```
   #### Sender connects to the receiver:
   
   With the default of 5 dispatcher threads, 5 connections are made to the receiver. The load goes from 0.0 to 0.0062499996:
   ```
   [warn 2022/03/16 14:16:53.836 PDT locator-ln <locator request thread 2> tid=0x47] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadBeforeUpdate=0.0
   
   [warn 2022/03/16 14:16:53.836 PDT locator-ln <locator request thread 2> tid=0x47] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadAfterUpdate=0.00125
   
   
   [warn 2022/03/16 14:16:53.836 PDT locator-ln <locator request thread 6> tid=0x5c] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadBeforeUpdate=0.00125
   
   [warn 2022/03/16 14:16:53.836 PDT locator-ln <locator request thread 6> tid=0x5c] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadAfterUpdate=0.0025
   
   
   [warn 2022/03/16 14:16:53.837 PDT locator-ln <locator request thread 5> tid=0x5b] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadBeforeUpdate=0.0025
   
   [warn 2022/03/16 14:16:53.837 PDT locator-ln <locator request thread 5> tid=0x5b] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadAfterUpdate=0.00375
   
   
   [warn 2022/03/16 14:16:53.837 PDT locator-ln <locator request thread 4> tid=0x5a] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadBeforeUpdate=0.00375
   
   [warn 2022/03/16 14:16:53.837 PDT locator-ln <locator request thread 4> tid=0x5a] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadAfterUpdate=0.005
   
   
   [warn 2022/03/16 14:16:53.838 PDT locator-ln <locator request thread 3> tid=0x59] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadBeforeUpdate=0.005
   
   [warn 2022/03/16 14:16:53.838 PDT locator-ln <locator request thread 3> tid=0x59] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadAfterUpdate=0.0062499996
   ```
   The connectionLoadMap shows the same 2 groups but now the __recv__group group load is 0.0062499996 for the gateway receiver:
   ```
   [warn 2022/03/16 14:16:55.831 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56224; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5370; load=0.0062499996
   ```
   #### Update the load:
   
   Periodically, the server sends an updated load to the locator.
   ```
   [warn 2022/03/16 14:16:57.464 PDT locator-ln <P2P message reader for 192.168.1.5(ln-1:75228)<v1>:41002 unshared ordered sender uid=5 dom #1 local port=45635 remote port=56270> tid=0x5e] XXX LocatorLoadSnapshot.updateConnectionLoadMap current load for location=192.168.1.5:5370; group=__recv__group; inputLoad=0.00625; currentLoad=0.0062499996
   
   [warn 2022/03/16 14:16:57.464 PDT locator-ln <P2P message reader for 192.168.1.5(ln-1:75228)<v1>:41002 unshared ordered sender uid=5 dom #1 local port=45635 remote port=56270> tid=0x5e] XXX LocatorLoadSnapshot.updateConnectionLoadMap updated load for location=192.168.1.5:5370; group=__recv__group; inputLoad=0.00625; newLoad=0.00625
   
   [warn 2022/03/16 14:16:57.832 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56224; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5370; load=0.00625
   ```
   #### Update the load after ping connection has been made:
   
   After another connection is made, the load is updated again.
   ```
   [warn 2022/03/16 14:17:02.466 PDT locator-ln <P2P message reader for 192.168.1.5(ln-1:75228)<v1>:41002 unshared ordered sender uid=5 dom #1 local port=45635 remote port=56270> tid=0x5e] XXX LocatorLoadSnapshot.updateConnectionLoadMap current load for location=192.168.1.5:5370; group=__recv__group; inputLoad=0.0075; currentLoad=0.00625
   
   [warn 2022/03/16 14:17:02.466 PDT locator-ln <P2P message reader for 192.168.1.5(ln-1:75228)<v1>:41002 unshared ordered sender uid=5 dom #1 local port=45635 remote port=56270> tid=0x5e] XXX LocatorLoadSnapshot.updateConnectionLoadMap updated load for location=192.168.1.5:5370; group=__recv__group; inputLoad=0.0075; newLoad=0.0075
   
   [warn 2022/03/16 14:17:03.841 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56224; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5370; load=0.0075
   ```
   #### Connect another sender:
   
   Another sender with 5 dispatcher threads connects, and the load is updated again.
   ```
   [warn 2022/03/16 14:29:44.794 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56600; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5190; load=0.015
   ```
   #### Disconnect one sender:
   
   When a sender disconnects, the load is updated again.
   ```
   [warn 2022/03/16 14:30:38.843 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56600; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5190; load=0.0075
   ```
   #### Start another receiver:
   
   When another receiver is started, an entry for it is added to the connectionLoadMap with load=0.0.
   ```
   [warn 2022/03/16 14:35:07.535 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56940; load=0.0
   		location=192.168.1.5:56833; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5055; load=0.015
   		location=192.168.1.5:5256; load=0.0
   ```
   #### Two receivers and two senders:
   
   When two receivers are started and two senders are connected, the load is updated (and balanced). In this case, the extra connections are pingers - one from each sender to each receiver.
   ```
   [warn 2022/03/16 14:44:32.269 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:57530; load=0.0
   		location=192.168.1.5:57553; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5349; load=0.00875
   		location=192.168.1.5:5025; load=0.00875
   ```
   #### Load balance senders:
   
   This feature does not seem to be working properly. These changes seem to make it work better. I have another bunch of analysis on this that I will either post separately or file a JIRA on.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] DonalEvans commented on a change in pull request #7378: GEODE-10056: Improve gateway-receiver load balance

Posted by GitBox <gi...@apache.org>.
DonalEvans commented on a change in pull request #7378:
URL: https://github.com/apache/geode/pull/7378#discussion_r819842187



##########
File path: geode-core/src/main/java/org/apache/geode/distributed/internal/LocatorLoadSnapshot.java
##########
@@ -479,27 +495,33 @@ void removeFromMap(Map<String, Map<ServerLocationAndMemberId, LoadHolder>> map,
         }
       }
     }
-    Map groupMap = map.get(null);
+    Map<ServerLocationAndMemberId, LoadHolder> groupMap = map.get(null);
     groupMap.remove(locationAndMemberId);
   }
 
   @VisibleForTesting
-  void updateMap(Map map, ServerLocation location, float load, float loadPerConnection) {
-    updateMap(map, location, "", load, loadPerConnection);
-  }
-
-  @VisibleForTesting
-  void updateMap(Map map, ServerLocation location, String memberId, float load,
+  void updateConnectionLoadMap(ServerLocation location, String memberId, float load,
       float loadPerConnection) {
-    Map groupMap = (Map) map.get(null);
-    LoadHolder holder;
-    if (memberId.equals("")) {
-      holder = (LoadHolder) groupMap.get(location);
-    } else {
-      ServerLocationAndMemberId locationAndMemberId =
-          new ServerLocationAndMemberId(location, memberId);
-      holder = (LoadHolder) groupMap.get(locationAndMemberId);
+    ServerLocationAndMemberId locationAndMemberId =
+        new ServerLocationAndMemberId(location, memberId);
+
+    Map<ServerLocationAndMemberId, LoadHolder> groupMap = connectionLoadMap.get(null);
+    LoadHolder holder = groupMap.get(locationAndMemberId);
+    if (holder == null) {
+      groupMap = connectionLoadMap.get(GatewayReceiver.RECEIVER_GROUP);
+      if (groupMap != null) {
+        holder = groupMap.get(locationAndMemberId);
+      }
     }
+
+    if (holder != null) {
+      holder.setLoad(load, loadPerConnection);
+    }
+  }
+
+  void updateQueueLoadMap(ServerLocation location, float load, float loadPerConnection) {

Review comment:
       This method should be annotated with `@VisibleForTesting` as it would be `private` if it wasn't being used in the unit test for this class.

##########
File path: geode-core/src/main/java/org/apache/geode/distributed/internal/LocatorLoadSnapshot.java
##########
@@ -413,7 +413,19 @@ public List getServersForQueue(String group, Set<ServerLocation> excludedServers
           new ServerLoad(connectionLoad.getLoad(), connectionLoad.getLoadPerConnection(),
               queueLoad.getLoad(), queueLoad.getLoadPerConnection()));
     }
+    return result;
+  }
+
+  public synchronized Map<ServerLocationAndMemberId, LoadHolder> getGatewayReceiverLoadMap() {

Review comment:
       Could this method be annotated with the `@TestOnly` annotation to make it clear that it's a test-only method please? Also, since it's only used in the unit test for this class, the visibility can be changed to package-private rather than public.

##########
File path: geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
   }
 
   @Test
-  public void updateMapWithServerLocationAndMemberId() {
+  public void updateConnectionMapWithServerLocationAndMemberId() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
-    final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();
-    final ServerLocationAndMemberId sli = new ServerLocationAndMemberId(serverLocation, uniqueId);
-    LocatorLoadSnapshot.LoadHolder loadHolder =
-        new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, LOAD_POLL_INTERVAL);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> groupServers = new HashMap<>();
-    groupServers.put(sli, loadHolder);
-    Map<String, Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
+    final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();;
 
-    loadSnapshot.updateMap(map, serverLocation, uniqueId, 60, 2);
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 60, 2);
 
     LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
         new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2, LOAD_POLL_INTERVAL);
 
-    assertEquals(expectedLoadHolder.getLoad(), groupServers.get(sli).getLoad(), 0);
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertEquals(expectedLoadHolder.getLoad(),
+        serverLoadMap.get(serverLocation).getConnectionLoad(), 0);
+    assertEquals(expectedLoadHolder.getLoadPerConnection(),
+        serverLoadMap.get(serverLocation).getLoadPerConnection(), 0);
+  }
+
+  @Test
+  public void updateConnectionMapWithServerLocationAndMemberIdGatewayReceiver() {

Review comment:
       This test name is a little unclear. Would it be possible to change it so that it states what behaviour is being tested, under what conditions, and what the expected result is? Particularly to distinguish it from the above test case.

##########
File path: geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
   }
 
   @Test
-  public void updateMapWithServerLocationAndMemberId() {
+  public void updateConnectionMapWithServerLocationAndMemberId() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
-    final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();
-    final ServerLocationAndMemberId sli = new ServerLocationAndMemberId(serverLocation, uniqueId);
-    LocatorLoadSnapshot.LoadHolder loadHolder =
-        new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, LOAD_POLL_INTERVAL);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> groupServers = new HashMap<>();
-    groupServers.put(sli, loadHolder);
-    Map<String, Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
+    final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();;
 
-    loadSnapshot.updateMap(map, serverLocation, uniqueId, 60, 2);
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 60, 2);
 
     LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
         new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2, LOAD_POLL_INTERVAL);
 
-    assertEquals(expectedLoadHolder.getLoad(), groupServers.get(sli).getLoad(), 0);
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertEquals(expectedLoadHolder.getLoad(),
+        serverLoadMap.get(serverLocation).getConnectionLoad(), 0);
+    assertEquals(expectedLoadHolder.getLoadPerConnection(),
+        serverLoadMap.get(serverLocation).getLoadPerConnection(), 0);
+  }
+
+  @Test
+  public void updateConnectionMapWithServerLocationAndMemberIdGatewayReceiver() {
+    final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
+
+    final ServerLocation serverLocation = new ServerLocation("localhost", 1);
+    final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();
+    final ServerLocationAndMemberId servLocAndMemberId =
+        new ServerLocationAndMemberId(serverLocation, uniqueId);
+
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[] {GatewayReceiver.RECEIVER_GROUP},
+        new ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
+        new LocatorLoadSnapshot.LoadHolder(serverLocation, 70, 8, LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, expectedLoadHolder.getLoad(),
+        expectedLoadHolder.getLoadPerConnection());
+
+    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> serverLoadMap =
+        loadSnapshot.getGatewayReceiverLoadMap();
+    assertEquals(expectedLoadHolder.getLoad(),
+        serverLoadMap.get(servLocAndMemberId).getLoad(), 0);
+    assertEquals(expectedLoadHolder.getLoadPerConnection(),
+        serverLoadMap.get(servLocAndMemberId).getLoadPerConnection(), 0);
+  }
+
+  @Test
+  public void updateConnectionMapWithServerLocationAndMemberIdTrafficConnectionAndGatewayReceiverGroup() {
+    final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
+
+    final ServerLocation serverLocation = new ServerLocation("localhost", 1);
+    final ServerLocation gatewayReceiverLocation = new ServerLocation("gatewayReciverHost", 111);
+    final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();;

Review comment:
       Extra semicolon here.

##########
File path: geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
   }
 
   @Test
-  public void updateMapWithServerLocationAndMemberId() {
+  public void updateConnectionMapWithServerLocationAndMemberId() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
-    final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();
-    final ServerLocationAndMemberId sli = new ServerLocationAndMemberId(serverLocation, uniqueId);
-    LocatorLoadSnapshot.LoadHolder loadHolder =
-        new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, LOAD_POLL_INTERVAL);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> groupServers = new HashMap<>();
-    groupServers.put(sli, loadHolder);
-    Map<String, Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
+    final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();;

Review comment:
       Extra semicolon added here.

##########
File path: geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -661,53 +728,48 @@ public void updateMapWithServerLocationAndMemberIdKeyNotFound() {
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
     final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();
-    final ServerLocationAndMemberId sli = new ServerLocationAndMemberId(serverLocation, uniqueId);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> groupServers = new HashMap<>();
-    Map<String, Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
 
-    loadSnapshot.updateMap(map, serverLocation, uniqueId, 50, 1);
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 50, 1);
 
-    assertNull(groupServers.get(sli));
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertTrue("Expected connection map to be empty", serverLoadMap.isEmpty());
   }
 
   @Test
-  public void updateMapWithServerLocation() {
+  public void updateQueueMapWithServerLocation() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
     LocatorLoadSnapshot.LoadHolder loadHolder =
         new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, LOAD_POLL_INTERVAL);

Review comment:
       This variable is no longer used and can be removed.

##########
File path: geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
##########
@@ -234,11 +229,11 @@ private Object getLocatorListResponse(LocatorListRequest request) {
   }
 
   private Object pickQueueServers(QueueConnectionRequest clientRequest) {
-    Set excludedServers = new HashSet(clientRequest.getExcludedServers());
+    HashSet<ServerLocation> excludedServers = new HashSet<>(clientRequest.getExcludedServers());

Review comment:
       Could this stay using the `Set` interface rather than the concrete `HashSet` implementation please?

##########
File path: geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
   }
 
   @Test
-  public void updateMapWithServerLocationAndMemberId() {
+  public void updateConnectionMapWithServerLocationAndMemberId() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
-    final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();
-    final ServerLocationAndMemberId sli = new ServerLocationAndMemberId(serverLocation, uniqueId);
-    LocatorLoadSnapshot.LoadHolder loadHolder =
-        new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, LOAD_POLL_INTERVAL);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> groupServers = new HashMap<>();
-    groupServers.put(sli, loadHolder);
-    Map<String, Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
+    final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();;
 
-    loadSnapshot.updateMap(map, serverLocation, uniqueId, 60, 2);
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 60, 2);
 
     LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
         new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2, LOAD_POLL_INTERVAL);
 
-    assertEquals(expectedLoadHolder.getLoad(), groupServers.get(sli).getLoad(), 0);
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertEquals(expectedLoadHolder.getLoad(),
+        serverLoadMap.get(serverLocation).getConnectionLoad(), 0);
+    assertEquals(expectedLoadHolder.getLoadPerConnection(),
+        serverLoadMap.get(serverLocation).getLoadPerConnection(), 0);

Review comment:
       Could these assertions be changed to use AssertJ, i.e. `assertThat(X).isEqualTo(Y)` please?

##########
File path: geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
##########
@@ -631,28 +632,94 @@ public void testFindBestServersCalledWithNegativeCount() {
   }
 
   @Test
-  public void updateMapWithServerLocationAndMemberId() {
+  public void updateConnectionMapWithServerLocationAndMemberId() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
-    final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();
-    final ServerLocationAndMemberId sli = new ServerLocationAndMemberId(serverLocation, uniqueId);
-    LocatorLoadSnapshot.LoadHolder loadHolder =
-        new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, LOAD_POLL_INTERVAL);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> groupServers = new HashMap<>();
-    groupServers.put(sli, loadHolder);
-    Map<String, Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
+    final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();;
 
-    loadSnapshot.updateMap(map, serverLocation, uniqueId, 60, 2);
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 60, 2);
 
     LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
         new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2, LOAD_POLL_INTERVAL);
 
-    assertEquals(expectedLoadHolder.getLoad(), groupServers.get(sli).getLoad(), 0);
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertEquals(expectedLoadHolder.getLoad(),
+        serverLoadMap.get(serverLocation).getConnectionLoad(), 0);
+    assertEquals(expectedLoadHolder.getLoadPerConnection(),
+        serverLoadMap.get(serverLocation).getLoadPerConnection(), 0);
+  }
+
+  @Test
+  public void updateConnectionMapWithServerLocationAndMemberIdGatewayReceiver() {
+    final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
+
+    final ServerLocation serverLocation = new ServerLocation("localhost", 1);
+    final String uniqueId = new InternalDistributedMember("localhost", 1).getUniqueId();
+    final ServerLocationAndMemberId servLocAndMemberId =
+        new ServerLocationAndMemberId(serverLocation, uniqueId);
+
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[] {GatewayReceiver.RECEIVER_GROUP},
+        new ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
+        new LocatorLoadSnapshot.LoadHolder(serverLocation, 70, 8, LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, expectedLoadHolder.getLoad(),
+        expectedLoadHolder.getLoadPerConnection());
+
+    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> serverLoadMap =
+        loadSnapshot.getGatewayReceiverLoadMap();
+    assertEquals(expectedLoadHolder.getLoad(),
+        serverLoadMap.get(servLocAndMemberId).getLoad(), 0);
+    assertEquals(expectedLoadHolder.getLoadPerConnection(),
+        serverLoadMap.get(servLocAndMemberId).getLoadPerConnection(), 0);
+  }
+
+  @Test
+  public void updateConnectionMapWithServerLocationAndMemberIdTrafficConnectionAndGatewayReceiverGroup() {

Review comment:
       This test name could also be improved a bit to help make it clearer what the test is doing.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locator1Site2;
+  private MemberVM locator1Site1;
+  private MemberVM locator2Site1;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server3Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite2;
+
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+  private static final String REGION_NAME = "test1";
+  private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+  private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;
+  private static final int LOAD_POLL_INTERVAL_OFFSET = 2000;
+
+  @Test
+  public void testGatewayConnectionCorrectlyLoadBalancedAtStartup() throws Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 500);

Review comment:
       Rather than calling these three methods at the start of each test, you could move the code in those methods to a `@Before` method, or just have a `@Before` method that calls them.

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locator1Site2;
+  private MemberVM locator1Site1;
+  private MemberVM locator2Site1;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server3Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite2;
+
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+  private static final String REGION_NAME = "test1";
+  private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+  private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;

Review comment:
       This offset seems somewhat arbitrary. How was it determined?

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locator1Site2;
+  private MemberVM locator1Site1;
+  private MemberVM locator2Site1;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server3Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite2;
+
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+  private static final String REGION_NAME = "test1";
+  private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+  private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;
+  private static final int LOAD_POLL_INTERVAL_OFFSET = 2000;
+
+  @Test
+  public void testGatewayConnectionCorrectlyLoadBalancedAtStartup() throws Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 500);
+
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, server3Site1);
+  }
+
+  @Test
+  public void testGatewayConnLoadBalancedAfterCoordinatorLocatorShutdown() throws Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 400);
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, server3Site1);
+
+    executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER, null);
+
+    locator1Site1.stop(true);
+
+    // Wait for default load-poll-interval plus offset to expire, so that all servers send load
+    // to locator before continuing with the test
+    GeodeAwaitility.await().atLeast(DEFAULT_LOAD_POLL_INTERVAL + LOAD_POLL_INTERVAL_OFFSET,
+        TimeUnit.MILLISECONDS);
+
+    executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER, null);
+
+    doPutsClientSite2(400, 800);
+
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, server3Site1);
+  }
+
+  @Test
+  public void testGatewayConnLoadBalancedAfterCoordinatorLocatorShutdownAndGatewayReceiverStopped()
+      throws Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 400);
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, server3Site1);
+
+    executeGatewayReceiverActionCommandAndValidateStateSite1(CliStrings.STOP_GATEWAYRECEIVER,
+        server1Site1);
+    executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER,
+        server1Site2);
+    locator1Site1.stop(true);
+
+    // Wait for default load-poll-interval plus offset to expire, so that all servers send load
+    // to locator before continuing with the test
+    GeodeAwaitility.await().atLeast(DEFAULT_LOAD_POLL_INTERVAL + LOAD_POLL_INTERVAL_OFFSET,
+        TimeUnit.MILLISECONDS);
+
+    executeGatewayReceiverActionCommandAndValidateStateSite1(CliStrings.START_GATEWAYRECEIVER,
+        server1Site1);
+    executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER,
+        server1Site2);
+    doPutsClientSite2(400, 800);
+
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, server3Site1);
+  }
+
+  void executeGatewayReceiverActionCommandAndValidateStateSite1(String cliString, MemberVM memberVM)
+      throws Exception {
+    connectGfshToSite(locator2Site1);
+    String command = new CommandStringBuilder(cliString)
+        .addOption(CliStrings.MEMBERS, getMember(memberVM.getVM()).toString())
+        .getCommandString();
+    gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+    if (cliString.equals(CliStrings.STOP_GATEWAYRECEIVER)) {
+      memberVM.invoke(() -> verifyReceiverState(false));
+      locator2Site1.invoke(
+          () -> validateGatewayReceiverMXBeanProxy(getMember(memberVM.getVM()), false));
+    } else if (cliString.equals(CliStrings.START_GATEWAYRECEIVER)) {
+      memberVM.invoke(() -> verifyReceiverState(true));
+      locator2Site1.invoke(
+          () -> validateGatewayReceiverMXBeanProxy(getMember(memberVM.getVM()), true));
+    }
+  }
+
+  void executeGatewaySenderActionCommandAndValidateStateSite2(String cliString, MemberVM memberVM)
+      throws Exception {
+    connectGfshToSite(locator1Site2);
+    String command;
+    if (memberVM == null) {
+      command = new CommandStringBuilder(cliString)
+          .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+          .getCommandString();
+      gfsh.executeAndAssertThat(command).statusIsSuccess();
+      verifyGatewaySenderState(server1Site2, isRunning(cliString));
+      verifyGatewaySenderState(server2Site2, isRunning(cliString));
+    } else {
+      command = new CommandStringBuilder(cliString)
+          .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+          .addOption(CliStrings.MEMBERS, getMember(memberVM.getVM()).toString())
+          .getCommandString();
+      gfsh.executeAndAssertThat(command).statusIsSuccess();
+      verifyGatewaySenderState(memberVM, isRunning(cliString));
+    }
+  }
+
+  boolean isRunning(String cliString) {
+    return CliStrings.START_GATEWAYSENDER.equals(cliString);
+  }
+
+  void verifyGatewaySenderState(MemberVM memberVM, boolean isRunning) {
+    memberVM.invoke(() -> verifySenderState("ln", isRunning, false));
+    locator1Site2.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(memberVM.getVM()), "ln", isRunning,
+            false));
+  }
+
+  int getGatewayReceiverStats() {
+    Set<GatewayReceiver> gatewayReceivers = ClusterStartupRule.getCache().getGatewayReceivers();
+    GatewayReceiver receiver = gatewayReceivers.iterator().next();
+    CacheServerStats stats = ((CacheServerImpl) receiver.getServer()).getAcceptor().getStats();
+    return stats.getCurrentClientConnections();
+  }
+
+  void checkConnectionLoadBalancedOnServers(MemberVM... members) {
+    int numberOfConnections = members[0].invoke(this::getGatewayReceiverStats);
+
+    for (MemberVM memberVM : members) {
+      await().untilAsserted(() -> assertThat(memberVM.invoke(this::getGatewayReceiverStats))
+          .isLessThan(numberOfConnections + NUM_CONNECTION_PER_SERVER_OFFSET));
+      await().untilAsserted(() -> assertThat(memberVM.invoke(this::getGatewayReceiverStats))
+          .isGreaterThan(numberOfConnections - NUM_CONNECTION_PER_SERVER_OFFSET));
+    }
+  }
+
+  void startWAN() throws Exception {
+    Properties props = new Properties();
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE1);
+    locator1Site1 = clusterStartupRule.startLocatorVM(1, props);
+    locator2Site1 = clusterStartupRule.startLocatorVM(7, props, locator1Site1.getPort());
+
+    // start servers for site #1
+    server1Site1 =
+        clusterStartupRule.startServerVM(3, locator1Site1.getPort(), locator2Site1.getPort());
+    server2Site1 =
+        clusterStartupRule.startServerVM(4, locator1Site1.getPort(), locator2Site1.getPort());
+    server3Site1 =
+        clusterStartupRule.startServerVM(6, locator1Site1.getPort(), locator2Site1.getPort());
+
+    connectGfshToSite(locator1Site1);
+
+    // create partition region on site #1
+    CommandStringBuilder regionCmd = new CommandStringBuilder(CliStrings.CREATE_REGION);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION");
+    regionCmd.addOption(CliStrings.CREATE_REGION__REDUNDANTCOPIES, "1");
+
+    gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+
+    String csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYRECEIVER)
+        .addOption(CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS, "localhost")
+        .getCommandString();
+
+    gfsh.executeAndAssertThat(csb).statusIsSuccess();
+
+    server1Site1.invoke(() -> verifyReceiverState(true));
+    locator2Site1.invoke(
+        () -> validateGatewayReceiverMXBeanProxy(getMember(server1Site1.getVM()), true));
+    locator1Site1.invoke(
+        () -> validateGatewayReceiverMXBeanProxy(getMember(server1Site1.getVM()), true));
+
+    server2Site1.invoke(() -> verifyReceiverState(true));
+    locator2Site1.invoke(
+        () -> validateGatewayReceiverMXBeanProxy(getMember(server2Site1.getVM()), true));
+    locator1Site1.invoke(
+        () -> validateGatewayReceiverMXBeanProxy(getMember(server2Site1.getVM()), true));
+
+    server3Site1.invoke(() -> verifyReceiverState(true));
+    locator2Site1.invoke(
+        () -> validateGatewayReceiverMXBeanProxy(getMember(server3Site1.getVM()), true));
+    locator1Site1.invoke(
+        () -> validateGatewayReceiverMXBeanProxy(getMember(server3Site1.getVM()), true));
+
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE2);
+    props.setProperty(REMOTE_LOCATORS,
+        "localhost[" + locator1Site1.getPort() + "],localhost[" + locator2Site1.getPort() + "]");
+    locator1Site2 = clusterStartupRule.startLocatorVM(2, props);
+
+    // start servers for site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, locator1Site2.getPort());
+    server2Site2 = clusterStartupRule.startServerVM(9, locator1Site2.getPort());
+
+    // create parallel gateway-sender on site #2
+    connectGfshToSite(locator1Site2);
+    String command = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER)
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ln")
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, "1")
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true")
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS,
+            "" + NUMBER_OF_DISPATCHER_THREADS)
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY, "key")
+        .getCommandString();
+    gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+    server1Site2.invoke(() -> verifySenderState("ln", true, false));
+    server2Site2.invoke(() -> verifySenderState("ln", true, false));
+    locator1Site2.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server1Site2.getVM()), "ln", true, false));
+    locator1Site2.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server2Site2.getVM()), "ln", true, false));
+
+    // create partition region on site #2
+    regionCmd = new CommandStringBuilder(CliStrings.CREATE_REGION);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION");
+    regionCmd.addOption(CliStrings.CREATE_REGION__GATEWAYSENDERID, "ln");
+    regionCmd.addOption(CliStrings.CREATE_REGION__REDUNDANTCOPIES, "1");
+    gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+  }
+
+  void startClientSite2(int locatorPort) throws Exception {
+    clientSite2 =
+        clusterStartupRule.startClientVM(8, c -> c.withLocatorConnection(locatorPort));
+    clientSite2.invoke(() -> {
+      ClusterStartupRule.clientCacheRule.createProxyRegion(REGION_NAME);
+    });
+  }
+
+  void doPutsClientSite2(int starRange, int stopRange) {

Review comment:
       Typo here, should be "startRange".

##########
File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderConnectionLoadBalanceDistributedTest.java
##########
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import static org.apache.geode.cache.server.CacheServer.DEFAULT_LOAD_POLL_INTERVAL;
+import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class ParallelGatewaySenderConnectionLoadBalanceDistributedTest implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locator1Site2;
+  private MemberVM locator1Site1;
+  private MemberVM locator2Site1;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server3Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite2;
+
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+  private static final String REGION_NAME = "test1";
+  private static final int NUMBER_OF_DISPATCHER_THREADS = 20;
+  private static final int NUM_CONNECTION_PER_SERVER_OFFSET = 4;
+  private static final int LOAD_POLL_INTERVAL_OFFSET = 2000;
+
+  @Test
+  public void testGatewayConnectionCorrectlyLoadBalancedAtStartup() throws Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 500);
+
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, server3Site1);
+  }
+
+  @Test
+  public void testGatewayConnLoadBalancedAfterCoordinatorLocatorShutdown() throws Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 400);
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, server3Site1);
+
+    executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER, null);
+
+    locator1Site1.stop(true);
+
+    // Wait for default load-poll-interval plus offset to expire, so that all servers send load
+    // to locator before continuing with the test
+    GeodeAwaitility.await().atLeast(DEFAULT_LOAD_POLL_INTERVAL + LOAD_POLL_INTERVAL_OFFSET,
+        TimeUnit.MILLISECONDS);
+
+    executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER, null);
+
+    doPutsClientSite2(400, 800);
+
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, server3Site1);
+  }
+
+  @Test
+  public void testGatewayConnLoadBalancedAfterCoordinatorLocatorShutdownAndGatewayReceiverStopped()
+      throws Exception {
+    startWAN();
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 400);
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, server3Site1);
+
+    executeGatewayReceiverActionCommandAndValidateStateSite1(CliStrings.STOP_GATEWAYRECEIVER,
+        server1Site1);
+    executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER,
+        server1Site2);
+    locator1Site1.stop(true);
+
+    // Wait for default load-poll-interval plus offset to expire, so that all servers send load
+    // to locator before continuing with the test
+    GeodeAwaitility.await().atLeast(DEFAULT_LOAD_POLL_INTERVAL + LOAD_POLL_INTERVAL_OFFSET,
+        TimeUnit.MILLISECONDS);
+
+    executeGatewayReceiverActionCommandAndValidateStateSite1(CliStrings.START_GATEWAYRECEIVER,
+        server1Site1);
+    executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER,
+        server1Site2);
+    doPutsClientSite2(400, 800);
+
+    checkConnectionLoadBalancedOnServers(server1Site1, server2Site1, server3Site1);
+  }
+
+  void executeGatewayReceiverActionCommandAndValidateStateSite1(String cliString, MemberVM memberVM)
+      throws Exception {
+    connectGfshToSite(locator2Site1);
+    String command = new CommandStringBuilder(cliString)
+        .addOption(CliStrings.MEMBERS, getMember(memberVM.getVM()).toString())
+        .getCommandString();
+    gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+    if (cliString.equals(CliStrings.STOP_GATEWAYRECEIVER)) {
+      memberVM.invoke(() -> verifyReceiverState(false));
+      locator2Site1.invoke(
+          () -> validateGatewayReceiverMXBeanProxy(getMember(memberVM.getVM()), false));
+    } else if (cliString.equals(CliStrings.START_GATEWAYRECEIVER)) {
+      memberVM.invoke(() -> verifyReceiverState(true));
+      locator2Site1.invoke(
+          () -> validateGatewayReceiverMXBeanProxy(getMember(memberVM.getVM()), true));
+    }
+  }
+
+  void executeGatewaySenderActionCommandAndValidateStateSite2(String cliString, MemberVM memberVM)
+      throws Exception {
+    connectGfshToSite(locator1Site2);
+    String command;
+    if (memberVM == null) {
+      command = new CommandStringBuilder(cliString)
+          .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+          .getCommandString();
+      gfsh.executeAndAssertThat(command).statusIsSuccess();
+      verifyGatewaySenderState(server1Site2, isRunning(cliString));
+      verifyGatewaySenderState(server2Site2, isRunning(cliString));
+    } else {
+      command = new CommandStringBuilder(cliString)
+          .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+          .addOption(CliStrings.MEMBERS, getMember(memberVM.getVM()).toString())
+          .getCommandString();
+      gfsh.executeAndAssertThat(command).statusIsSuccess();
+      verifyGatewaySenderState(memberVM, isRunning(cliString));
+    }
+  }
+
+  boolean isRunning(String cliString) {
+    return CliStrings.START_GATEWAYSENDER.equals(cliString);
+  }
+
+  void verifyGatewaySenderState(MemberVM memberVM, boolean isRunning) {
+    memberVM.invoke(() -> verifySenderState("ln", isRunning, false));
+    locator1Site2.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(memberVM.getVM()), "ln", isRunning,
+            false));
+  }
+
+  int getGatewayReceiverStats() {
+    Set<GatewayReceiver> gatewayReceivers = ClusterStartupRule.getCache().getGatewayReceivers();
+    GatewayReceiver receiver = gatewayReceivers.iterator().next();
+    CacheServerStats stats = ((CacheServerImpl) receiver.getServer()).getAcceptor().getStats();
+    return stats.getCurrentClientConnections();
+  }
+
+  void checkConnectionLoadBalancedOnServers(MemberVM... members) {
+    int numberOfConnections = members[0].invoke(this::getGatewayReceiverStats);
+
+    for (MemberVM memberVM : members) {
+      await().untilAsserted(() -> assertThat(memberVM.invoke(this::getGatewayReceiverStats))
+          .isLessThan(numberOfConnections + NUM_CONNECTION_PER_SERVER_OFFSET));
+      await().untilAsserted(() -> assertThat(memberVM.invoke(this::getGatewayReceiverStats))
+          .isGreaterThan(numberOfConnections - NUM_CONNECTION_PER_SERVER_OFFSET));
+    }
+  }
+
+  void startWAN() throws Exception {
+    Properties props = new Properties();
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE1);
+    locator1Site1 = clusterStartupRule.startLocatorVM(1, props);
+    locator2Site1 = clusterStartupRule.startLocatorVM(7, props, locator1Site1.getPort());
+
+    // start servers for site #1
+    server1Site1 =
+        clusterStartupRule.startServerVM(3, locator1Site1.getPort(), locator2Site1.getPort());
+    server2Site1 =
+        clusterStartupRule.startServerVM(4, locator1Site1.getPort(), locator2Site1.getPort());
+    server3Site1 =
+        clusterStartupRule.startServerVM(6, locator1Site1.getPort(), locator2Site1.getPort());

Review comment:
       Small nitpick, but the VM numbers for the two sites seem kind of arbitrary. Would it be possible to have site 1 use 0 -> 4 and site 2 use 5 -> 8?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jvarenina edited a comment on pull request #7378: GEODE-10056: Improve gateway-receiver load balance

Posted by GitBox <gi...@apache.org>.
jvarenina edited a comment on pull request #7378:
URL: https://github.com/apache/geode/pull/7378#issuecomment-1048513322


   Hi reviewers,
   
   With this solution, each server will now send CacheServerLoadMessage containing the correct connection load of the gateway-receiver to all locators in the cluster. This action will happen every 5 seconds as configured with the load-poll-interval parameter. Additionally, the coordinator locator will increase the load each time it provides the server location to the remote gateway-sender in ClientConnectionRequest/ClientConnectionResponse. Locator only maintains load temporarily until CacheServerLoadMessage is received. This behavior makes sense as the server tracks connection load more accurately than the locator. Locator only increases connection load based on the received connection requests while server adjusts the connection load each time connection is established and disconnected.
   
   ClientConnectionRequest messages are usually sent to the coordinator (locator) in bursts when the gateway-sender is establishing connections due to traffic. This behavior results in the locator's connection load being way ahead of the server connection load because servers did not establish those connections yet. Suppose during these bursts CacheServerLoadMessage message come to locator carrying low load value for one of the gateway-receivers. In that case, that receiver will be picked more frequently (will have the lowest load), resulting in unbalanced gateway-sender connections. In order for this to have a big impact on load-balancing of sender connections the gateway-receivers must be started with some small delay, so that CacheServerLoadMessages are sent with some delay that is enough to cause imbalance. If CacheServerLoadMessages were sent at the similar time then this would not be a problem as all messages would have similar load and would update locator at similar time.
   
   I would be really grateful if you could share your opinion on this matter? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jvarenina commented on pull request #7378: GEODE-10056: Improving gateway-reciever load balancing

Posted by GitBox <gi...@apache.org>.
jvarenina commented on pull request #7378:
URL: https://github.com/apache/geode/pull/7378#issuecomment-1048513322


   Hi reviewers,
   
   With this solution each server will now send CacheServerLoadMessage containing the correct connection load of the gateway-receives to all locators in cluster. This will happen every 5 seconds as configured with load-poll-interval parameter. Additionally, coordinator locator will on its own increase the load each time it provides the server location to remote gateway-sender in ClientConnectionRequest/ClientConnectionResponse. Locator only maintains load temporarily until CacheServerLoadMessage is received. This makes sense as the load that server tracks is more accurate than the load tracked by locator. Locator only increase load based on the received connection requests while server adjusts the load each time connection is actually established and disconnected.
   
   When server/gateway-sender is starting up then usually ClientConnectionRequest are sent to coordinator (locator) in bursts. This will result with the load on locator that is way ahead of the server load, because servers did not established those connections yet. If during these bursts CacheServerLoadMessage message come to locator carrying low load value for one of the gateway-receivers, then that receiver will be picked more frequently (will have lowest load), and that could result with unbalanced gateway-sender connections. In order for this to have a big impact on load-balancing of sender connections the gateway-receivers must be started with some small delay, so that CacheServerLoadMessages are sent with some delay that is enough to cause imbalance. If CacheServerLoadMessages were sent at the similar time then this would not be a problem as all messages would have similar load and would update locator at similar time.
   
   I would be really grateful if you could share your opinion on this matter? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jvarenina edited a comment on pull request #7378: GEODE-10056: Improving gateway-reciever load balancing

Posted by GitBox <gi...@apache.org>.
jvarenina edited a comment on pull request #7378:
URL: https://github.com/apache/geode/pull/7378#issuecomment-1048513322


   Hi reviewers,
   
   With this solution each server will now send CacheServerLoadMessage containing the correct connection load of the gateway-receiver to all locators in cluster. This will happen every 5 seconds as configured with load-poll-interval parameter. Additionally, coordinator locator will on its own increase the load each time it provides the server location to remote gateway-sender in ClientConnectionRequest/ClientConnectionResponse. Locator only maintains load temporarily until CacheServerLoadMessage is received. This makes sense as the load that server tracks is more accurate than the load tracked by locator. Locator only increase load based on the received connection requests while server adjusts the load each time connection is actually established and disconnected.
   
   When gateway-sender connections are establishing due to traffic, then usually ClientConnectionRequest are sent to coordinator (locator) in bursts. This will result with the load on locator that is way ahead of the server load, because servers did not established those connections yet. If during these bursts CacheServerLoadMessage message come to locator carrying low load value for one of the gateway-receivers, then that receiver will be picked more frequently (will have lowest load), and that could result with unbalanced gateway-sender connections. In order for this to have a big impact on load-balancing of sender connections the gateway-receivers must be started with some small delay, so that CacheServerLoadMessages are sent with some delay that is enough to cause imbalance. If CacheServerLoadMessages were sent at the similar time then this would not be a problem as all messages would have similar load and would update locator at similar time.
   
   I would be really grateful if you could share your opinion on this matter? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jvarenina commented on pull request #7378: GEODE-10056: Improve gateway-receiver load balance

Posted by GitBox <gi...@apache.org>.
jvarenina commented on pull request #7378:
URL: https://github.com/apache/geode/pull/7378#issuecomment-1072272422


   Hi @boglesby,
   
   I also assumed that the same race condition is possible for the client connections, but I haven't tried to reproduce it. Thanks for pointing this out and lots of other valuable information. Also, thank you for the extensive testing you have done.
   
   If we decide to go with this solution, I agree that we should make the load-poll-interval parameter configurable for gateway receivers. Changing it to the lower value would slightly mitigate race condition effects.
   
   The load-balance gateways command is working on server this way:
   - pauses gateway-sender
   - destroys all connections and then rely upon the mechanism used during connection creation (ClientConnectionRequest/Response) to do the better load balancing
   - resume gateway-sender
   
   This command will result again in the burst of connection requests that could hit an issue caused by a race condition. 
   
   Maybe instead of sending load information periodically from the servers, the locator could scrape it (perhaps using CacheServerMXBean) from the servers and apply it simultaneously for all receivers in the locator. The locator could get load when it receives a connection request, and the current connection load is stale (e.g., older than 200 ms), as we don't expect many connections from gateway-senders. This way, the locator would at least have an up-to-date connection load taken at a similar time on all servers. This solution should even catch the change in connection load when the load-balance command destroys all connections.
   
   Maybe, an algorithm that could work this way:
   - Connection request received, check if a connection request is stale (older than new parameter load-update-frequency=200ms)
     - if yes, then try to get connection load from all servers asynchronously
       - if received load from all servers, then apply it in the locator
       - if any get fails, then check profiles again and immediately retry for all servers
     - Use immediately the current load
   - If the connection request is not received, then just periodically get load, e.g., every 5 seconds (load-poll-interval)
   
   Not sure if this makes any sense as I don't know how fast locator can scrape the load. I can create a prototype if you see that this could maybe work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] boglesby commented on pull request #7378: GEODE-10056: Improve gateway-receiver load balance

Posted by GitBox <gi...@apache.org>.
boglesby commented on pull request #7378:
URL: https://github.com/apache/geode/pull/7378#issuecomment-1068616384


   I'm not sure how to resolve the race condition you mention, but I see similar behavior with client/server connections.
   
   If a burst of connections is requested and none of those are made before the next load is received from the server, then the locator's load for that server gets reset back to zero.
   
   A burst of connections (10 in this case) causes the load to go from 0.0 to 0.012499998:
   ```
   [warn 2022/03/15 14:38:37.905 PDT locator <locator request thread 1> tid=0x24] XXX LocatorLoadSnapshot.getServerForConnection potentialServers={192.168.1.5:51249@192.168.1.5(server1:30200)<v1>:41001=LoadHolder[0.0, 192.168.1.5:51249, loadPollInterval=5000, 0.00125]}
   
   [warn 2022/03/15 14:38:37.906 PDT locator <locator request thread 1> tid=0x24] XXX LocatorLoadSnapshot.getServerForConnection selectedServer=192.168.1.5:51249; loadBeforeUpdate=0.0
   
   [warn 2022/03/15 14:38:37.907 PDT locator <locator request thread 1> tid=0x24] XXX LoadHolder.incConnections location=192.168.1.5:51249; load=0.00125
   
   [warn 2022/03/15 14:38:37.907 PDT locator <locator request thread 1> tid=0x24] XXX LocatorLoadSnapshot.getServerForConnection selectedServer=192.168.1.5:51249; loadAfterUpdate=0.00125
   
   ...
   
   [warn 2022/03/15 14:38:38.005 PDT locator <locator request thread 1> tid=0x24] XXX LocatorLoadSnapshot.getServerForConnection potentialServers={192.168.1.5:51249@192.168.1.5(server1:30200)<v1>:41001=LoadHolder[0.011249999, 192.168.1.5:51249, loadPollInterval=5000, 0.00125]}
   
   [warn 2022/03/15 14:38:38.005 PDT locator <locator request thread 1> tid=0x24] XXX LocatorLoadSnapshot.getServerForConnection selectedServer=192.168.1.5:51249; loadBeforeUpdate=0.011249999
   
   [warn 2022/03/15 14:38:38.005 PDT locator <locator request thread 1> tid=0x24] XXX LoadHolder.incConnections location=192.168.1.5:51249; load=0.012499998
   
   [warn 2022/03/15 14:38:38.005 PDT locator <locator request thread 1> tid=0x24] XXX LocatorLoadSnapshot.getServerForConnection selectedServer=192.168.1.5:51249; loadAfterUpdate=0.012499998
   ```
   If none of those connections are made before the next load is sent by that server, its load goes from 0.012499998 to 0.0:
   ```
   [warn 2022/03/15 14:39:25.140 PDT locator <P2P message reader for 192.168.1.5(server1:30200)<v1>:41001 unshared ordered sender uid=5 dom #1 local port=55139 remote port=51286> tid=0x56] XXX LocatorLoadSnapshot.updateLoad about to update connectionLoadMap location=192.168.1.5:51249; load=0.0; loadPerConnection=0.00125
   
   [warn 2022/03/15 14:39:25.140 PDT locator <P2P message reader for 192.168.1.5(server1:30200)<v1>:41001 unshared ordered sender uid=5 dom #1 local port=55139 remote port=51286> tid=0x56] XXX LocatorLoadSnapshot.updateMap location=192.168.1.5:51249; loadBeforeUpdate=0.012499998
   
   [warn 2022/03/15 14:39:25.141 PDT locator <P2P message reader for 192.168.1.5(server1:30200)<v1>:41001 unshared ordered sender uid=5 dom #1 local port=55139 remote port=51286> tid=0x56] XXX LocatorLoadSnapshot.updateMap location=192.168.1.5:51249; loadAfterUpdate=0.0
   
   [warn 2022/03/15 14:39:25.141 PDT locator <P2P message reader for 192.168.1.5(server1:30200)<v1>:41001 unshared ordered sender uid=5 dom #1 local port=55139 remote port=51286> tid=0x56] XXX LocatorLoadSnapshot.updateLoad done update connectionLoadMap location=192.168.1.5:51249
   ```
   The load for the next request starts is 0.0 again:
   ```
   [warn 2022/03/15 14:39:33.475 PDT locator <locator request thread 2> tid=0x54] XXX LocatorLoadSnapshot.getServerForConnection potentialServers={192.168.1.5:51249@192.168.1.5(server1:30200)<v1>:41001=LoadHolder[0.0, 192.168.1.5:51249, loadPollInterval=5000, 0.00125]}
   
   [warn 2022/03/15 14:39:33.475 PDT locator <locator request thread 2> tid=0x54] XXX LocatorLoadSnapshot.getServerForConnection selectedServer=192.168.1.5:51249; loadBeforeUpdate=0.0
   
   [warn 2022/03/15 14:39:33.475 PDT locator <locator request thread 2> tid=0x54] XXX LoadHolder.incConnections location=192.168.1.5:51249; load=0.00125
   
   [warn 2022/03/15 14:39:33.475 PDT locator <locator request thread 2> tid=0x54] XXX LocatorLoadSnapshot.getServerForConnection selectedServer=192.168.1.5:51249; loadAfterUpdate=0.00125
   
   ...
   ```
   One thing to note is that the load is only sent load-poll-interval (default=5 seconds) if it has changed. If it hasn't changed then it only gets sent every update frequency (which is 10 * 5 seconds by default).
   
   There is a boolean to control that frequency too:
   ```
   private static final int FORCE_LOAD_UPDATE_FREQUENCY = getInteger(
     GeodeGlossary.GEMFIRE_PREFIX + "BridgeServer.FORCE_LOAD_UPDATE_FREQUENCY", 10);
   ```
   The load-poll-interva is configurable, but currently only for the cache server not the gateway receiver. It probably wouldn't be too hard to add this support to gateway receiver.
   
   Also, there is a gfsh load-balance gateway-sender command that could help alleviate this condition.
   
   I'm still reviewing the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] boglesby commented on pull request #7378: GEODE-10056: Improve gateway-receiver load balance

Posted by GitBox <gi...@apache.org>.
boglesby commented on pull request #7378:
URL: https://github.com/apache/geode/pull/7378#issuecomment-1071176231


   I ran a few tests with some extra logging on these changes. They look good.
   
   #### The receiver exchanges profiles with the locator:
   ```
   [warn 2022/03/16 14:16:12.440 PDT locator-ln <Pooled High Priority Message Processor 2> tid=0x50] XXX LocatorLoadSnapshot.updateConnectionLoadMap location=192.168.1.5:5370; load=0.0
   
   [warn 2022/03/16 14:16:12.441 PDT locator-ln <Pooled High Priority Message Processor 2> tid=0x50] XXX LocatorLoadSnapshot.updateConnectionLoadMap current load for location=192.168.1.5:5370; group=__recv__group; inputLoad=0.0; currentLoad=0.0
   
   [warn 2022/03/16 14:16:12.441 PDT locator-ln <Pooled High Priority Message Processor 2> tid=0x50] XXX LocatorLoadSnapshot.updateConnectionLoadMap updated load for location=192.168.1.5:5370; group=__recv__group; inputLoad=0.0; newLoad=0.0
   ```
   The connectionLoadMap shows 2 groups, namely the null group (default) and the __recv__group group (gateway receiver), each with load=0.0:
   ```
   [warn 2022/03/16 14:16:13.777 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56224; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5370; load=0.0
   ```
   #### Sender connects to the receiver:
   
   With the default of 5 dispatcher threads, 5 connections are made to the receiver. The load goes from 0.0 to 0.0062499996:
   ```
   [warn 2022/03/16 14:16:53.836 PDT locator-ln <locator request thread 2> tid=0x47] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadBeforeUpdate=0.0
   
   [warn 2022/03/16 14:16:53.836 PDT locator-ln <locator request thread 2> tid=0x47] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadAfterUpdate=0.00125
   
   
   [warn 2022/03/16 14:16:53.836 PDT locator-ln <locator request thread 6> tid=0x5c] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadBeforeUpdate=0.00125
   
   [warn 2022/03/16 14:16:53.836 PDT locator-ln <locator request thread 6> tid=0x5c] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadAfterUpdate=0.0025
   
   
   [warn 2022/03/16 14:16:53.837 PDT locator-ln <locator request thread 5> tid=0x5b] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadBeforeUpdate=0.0025
   
   [warn 2022/03/16 14:16:53.837 PDT locator-ln <locator request thread 5> tid=0x5b] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadAfterUpdate=0.00375
   
   
   [warn 2022/03/16 14:16:53.837 PDT locator-ln <locator request thread 4> tid=0x5a] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadBeforeUpdate=0.00375
   
   [warn 2022/03/16 14:16:53.837 PDT locator-ln <locator request thread 4> tid=0x5a] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadAfterUpdate=0.005
   
   
   [warn 2022/03/16 14:16:53.838 PDT locator-ln <locator request thread 3> tid=0x59] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadBeforeUpdate=0.005
   
   [warn 2022/03/16 14:16:53.838 PDT locator-ln <locator request thread 3> tid=0x59] XXX LocatorLoadSnapshot.getServerForConnection group=__recv__group; server=192.168.1.5:5370; loadAfterUpdate=0.0062499996
   ```
   The connectionLoadMap shows the same 2 groups but now the __recv__group group load is 0.0062499996 for the gateway receiver:
   ```
   [warn 2022/03/16 14:16:55.831 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56224; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5370; load=0.0062499996
   ```
   #### Update the load:
   
   Periodically, the server sends an updated load to the locator.
   ```
   [warn 2022/03/16 14:16:57.464 PDT locator-ln <P2P message reader for 192.168.1.5(ln-1:75228)<v1>:41002 unshared ordered sender uid=5 dom #1 local port=45635 remote port=56270> tid=0x5e] XXX LocatorLoadSnapshot.updateConnectionLoadMap current load for location=192.168.1.5:5370; group=__recv__group; inputLoad=0.00625; currentLoad=0.0062499996
   
   [warn 2022/03/16 14:16:57.464 PDT locator-ln <P2P message reader for 192.168.1.5(ln-1:75228)<v1>:41002 unshared ordered sender uid=5 dom #1 local port=45635 remote port=56270> tid=0x5e] XXX LocatorLoadSnapshot.updateConnectionLoadMap updated load for location=192.168.1.5:5370; group=__recv__group; inputLoad=0.00625; newLoad=0.00625
   
   [warn 2022/03/16 14:16:57.832 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56224; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5370; load=0.00625
   ```
   #### Update the load after ping connection has been made:
   
   After another connection is made, the load is updated again.
   ```
   [warn 2022/03/16 14:17:02.466 PDT locator-ln <P2P message reader for 192.168.1.5(ln-1:75228)<v1>:41002 unshared ordered sender uid=5 dom #1 local port=45635 remote port=56270> tid=0x5e] XXX LocatorLoadSnapshot.updateConnectionLoadMap current load for location=192.168.1.5:5370; group=__recv__group; inputLoad=0.0075; currentLoad=0.00625
   
   [warn 2022/03/16 14:17:02.466 PDT locator-ln <P2P message reader for 192.168.1.5(ln-1:75228)<v1>:41002 unshared ordered sender uid=5 dom #1 local port=45635 remote port=56270> tid=0x5e] XXX LocatorLoadSnapshot.updateConnectionLoadMap updated load for location=192.168.1.5:5370; group=__recv__group; inputLoad=0.0075; newLoad=0.0075
   
   [warn 2022/03/16 14:17:03.841 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56224; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5370; load=0.0075
   ```
   #### Connect another sender:
   
   Another sender with 5 dispatcher threads connects, and the load is updated again.
   ```
   [warn 2022/03/16 14:29:44.794 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56600; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5190; load=0.015
   ```
   #### Disconnect one sender:
   
   When a sender disconnects, the load is updated again.
   ```
   [warn 2022/03/16 14:30:38.843 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56600; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5190; load=0.0075
   ```
   #### Start another receiver:
   
   When another receiver is started, an entry for it is added to the connectionLoadMap with load=0.0.
   ```
   [warn 2022/03/16 14:35:07.535 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:56940; load=0.0
   		location=192.168.1.5:56833; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5055; load=0.015
   		location=192.168.1.5:5256; load=0.0
   ```
   #### Two receivers and two senders:
   
   When two receivers are started and two senders are connected, the load is updated (and balanced). In this case, the extra connections are pingers - one from each sender to each receiver.
   ```
   [warn 2022/03/16 14:44:32.269 PDT locator-ln <Thread-14> tid=0x43] XXX LocatorLoadSnapshot.logConnectionLoadMap
   The connectionLoadMap contains the following 2 entries:
   	group=null
   		location=192.168.1.5:57530; load=0.0
   		location=192.168.1.5:57553; load=0.0
   	group=__recv__group
   		location=192.168.1.5:5349; load=0.00875
   		location=192.168.1.5:5025; load=0.00875
   ```
   #### Load balance senders:
   
   This feature does not seem to be working properly. These changes seem to make it work better. I have another bunch of analysis on this that I will either post separately or file a JIRA on.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org